diff --git a/src/java/kafkabridge/pom.xml b/src/java/kafkabridge/pom.xml index 718d661cac..12ba92b304 100644 --- a/src/java/kafkabridge/pom.xml +++ b/src/java/kafkabridge/pom.xml @@ -9,7 +9,7 @@ mases.kafkabridge Apache Kafka interface bridging implementation https://github.com/masesgroup/KafkaBridge - 1.1.7.0 + 1.1.8.0 diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AdminClientConfig.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AdminClientConfig.cs index c295b07db5..86c1c43f2b 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AdminClientConfig.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AdminClientConfig.cs @@ -16,11 +16,12 @@ * Refer to LICENSE for more information. */ +using MASES.KafkaBridge.Common.Config; using MASES.KafkaBridge.Java.Util; namespace MASES.KafkaBridge.Clients.Admin { - public class AdminClientConfig : JCOBridge.C2JBridge.JVMBridgeBase + public class AdminClientConfig : AbstractConfig { public override string ClassName => "org.apache.kafka.clients.admin.AdminClientConfig"; @@ -64,6 +65,8 @@ public class AdminClientConfig : JCOBridge.C2JBridge.JVMBridgeBase("RETRIES_CONFIG"); public static readonly string DEFAULT_API_TIMEOUT_MS_CONFIG = Clazz.GetField("DEFAULT_API_TIMEOUT_MS_CONFIG"); + public static readonly string SECURITY_PROVIDERS_CONFIG = Clazz.GetField("SECURITY_PROVIDERS_CONFIG"); + [System.Obsolete("This is not public in Apache Kafka API")] [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] public AdminClientConfig() { } @@ -73,4 +76,16 @@ public AdminClientConfig(Map props) { } } + + public class AdminClientConfigBuilder : CommonClientConfigsBuilder + { + public string SecurityProviders { get { return GetProperty(AdminClientConfig.SECURITY_PROVIDERS_CONFIG); } set { SetProperty(AdminClientConfig.SECURITY_PROVIDERS_CONFIG, value); } } + + public AdminClientConfigBuilder WithSecurityProviders(string securityProviders) + { + var clone = Clone(); + clone.SecurityProviders = securityProviders; + return clone; + } + } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AlterClientQuotasResult.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AlterClientQuotasResult.cs index 4f8d105cfc..e886ee5ff9 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AlterClientQuotasResult.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AlterClientQuotasResult.cs @@ -27,7 +27,7 @@ public class AlterClientQuotasResult : JCOBridge.C2JBridge.JVMBridgeBase "org.apache.kafka.clients.admin.AlterClientQuotasResult"; - public Map> values => IExecute>>("values"); + public Map> Values => IExecute>>("values"); public KafkaFuture All => IExecute>("all"); } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AlterUserScramCredentialsResult.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AlterUserScramCredentialsResult.cs index 1d9a963f8b..13f2876d97 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AlterUserScramCredentialsResult.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/AlterUserScramCredentialsResult.cs @@ -26,7 +26,7 @@ public class AlterUserScramCredentialsResult : JCOBridge.C2JBridge.JVMBridgeBase { public override string ClassName => "org.apache.kafka.clients.admin.AlterUserScramCredentialsResult"; - public Map> values => IExecute>>("values"); + public Map> Values => IExecute>>("values"); public KafkaFuture All => IExecute>("all"); } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/CreatePartitionsResult.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/CreatePartitionsResult.cs index 940cd6fa97..cea0b838f3 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/CreatePartitionsResult.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/CreatePartitionsResult.cs @@ -28,7 +28,7 @@ public class CreatePartitionsResult : JCOBridge.C2JBridge.JVMBridgeBase> Values => IExecute>>("values"); - public KafkaFuture All => IExecute < KafkaFuture>("all"); + public KafkaFuture All => IExecute>("all"); } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeConsumerGroupsResult.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeConsumerGroupsResult.cs index 45e0761e7d..822b81525b 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeConsumerGroupsResult.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeConsumerGroupsResult.cs @@ -25,7 +25,7 @@ public class DescribeConsumerGroupsResult : JCOBridge.C2JBridge.JVMBridgeBase "org.apache.kafka.clients.admin.DescribeConsumerGroupsResult"; - public Map> describedGroups => IExecute>>("describedGroups"); + public Map> DescribedGroups => IExecute>>("describedGroups"); public KafkaFuture> All => IExecute>>("all"); } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeLogDirsResult.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeLogDirsResult.cs index 86da490e6a..3b806c4615 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeLogDirsResult.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeLogDirsResult.cs @@ -26,10 +26,10 @@ public class DescribeLogDirsResult : JCOBridge.C2JBridge.JVMBridgeBase "org.apache.kafka.clients.admin.DescribeLogDirsResult"; - public Map>> values => IExecute>>>("descriptions"); + public Map>> Values => IExecute>>>("descriptions"); - public Map>> descriptions => IExecute>>>("descriptions"); + public Map>> Descriptions => IExecute>>>("descriptions"); - public KafkaFuture>> allDescriptions => IExecute>>>("allDescriptions"); + public KafkaFuture>> AllDescriptions => IExecute>>>("allDescriptions"); } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeUserScramCredentialsResult.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeUserScramCredentialsResult.cs index 64f937a629..b67a30d940 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeUserScramCredentialsResult.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/DescribeUserScramCredentialsResult.cs @@ -29,9 +29,9 @@ public class DescribeUserScramCredentialsResult : JCOBridge.C2JBridge.JVMBridgeB public KafkaFuture> Users => IExecute>>("users"); - public KafkaFuture description(string userName) + public KafkaFuture Description(string userName) { - return IExecute< KafkaFuture>("description"); + return IExecute< KafkaFuture>("description", userName); } } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/ListConsumerGroupOffsetsResult.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/ListConsumerGroupOffsetsResult.cs index 8d27798e14..4507eff165 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/ListConsumerGroupOffsetsResult.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/ListConsumerGroupOffsetsResult.cs @@ -26,6 +26,6 @@ public class ListConsumerGroupOffsetsResult : JCOBridge.C2JBridge.JVMBridgeBase< { public override string ClassName => "org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult"; - public KafkaFuture> partitionsToOffsetAndMetadata => IExecute>>("partitionsToOffsetAndMetadata"); + public KafkaFuture> PartitionsToOffsetAndMetadata => IExecute>>("partitionsToOffsetAndMetadata"); } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/NewTopic.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/NewTopic.cs index 507fad153f..78af827a04 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/NewTopic.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Admin/NewTopic.cs @@ -16,6 +16,7 @@ * Refer to LICENSE for more information. */ +using MASES.KafkaBridge.Common.Config; using MASES.KafkaBridge.Java.Util; namespace MASES.KafkaBridge.Clients.Admin @@ -24,6 +25,8 @@ public class NewTopic : JCOBridge.C2JBridge.JVMBridgeBase { public override string ClassName => "org.apache.kafka.clients.admin.NewTopic"; + [System.Obsolete("This is not public in Apache Kafka API", true)] + [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] public NewTopic() { } @@ -33,10 +36,30 @@ public NewTopic(string name, int numPartitions = 1, short replicationFactor = 1) { } + public NewTopic(string name, Map> replicasAssignments) + :base(name, replicasAssignments) + { + } + + public string Name => IExecute("name"); + + public int NumPartitions => IExecute("numPartitions"); + + public int ReplicationFactor => IExecute("replicationFactor"); + + public Map> ReplicasAssignments => IExecute>>("replicasAssignments"); + public NewTopic Configs(Map configs) { return IExecute("configs", configs); } + + public NewTopic Configs(TopicConfigBuilder config) + { + return Configs(config.ToMap()); + } + + public Map Configs() => IExecute>("configs"); } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/CommonClientConfigs.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/CommonClientConfigs.cs new file mode 100644 index 0000000000..4d837fe3d7 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/CommonClientConfigs.cs @@ -0,0 +1,401 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KafkaBridge.Common.Metrics; +using MASES.KafkaBridge.Java.Util; + +namespace MASES.KafkaBridge.Clients +{ + public class CommonClientConfigs : JCOBridge.C2JBridge.JVMBridgeBase + { + public override string ClassName => "org.apache.kafka.clients.CommonClientConfigs"; + + public static readonly string BOOTSTRAP_SERVERS_CONFIG = Clazz.GetField("BOOTSTRAP_SERVERS_CONFIG"); + + public static readonly string CLIENT_DNS_LOOKUP_CONFIG = Clazz.GetField("CLIENT_DNS_LOOKUP_CONFIG"); + + public static readonly string METADATA_MAX_AGE_CONFIG = Clazz.GetField("METADATA_MAX_AGE_CONFIG"); + + public static readonly string SEND_BUFFER_CONFIG = Clazz.GetField("SEND_BUFFER_CONFIG"); + + public static readonly string RECEIVE_BUFFER_CONFIG = Clazz.GetField("RECEIVE_BUFFER_CONFIG"); + + public static readonly string CLIENT_ID_CONFIG = Clazz.GetField("CLIENT_ID_CONFIG"); + + public static readonly string CLIENT_RACK_CONFIG = Clazz.GetField("CLIENT_RACK_CONFIG"); + + public static readonly string RECONNECT_BACKOFF_MS_CONFIG = Clazz.GetField("RECONNECT_BACKOFF_MS_CONFIG"); + + public static readonly string RECONNECT_BACKOFF_MAX_MS_CONFIG = Clazz.GetField("RECONNECT_BACKOFF_MAX_MS_CONFIG"); + + public static readonly string RETRIES_CONFIG = Clazz.GetField("RETRIES_CONFIG"); + + public static readonly string RETRY_BACKOFF_MS_CONFIG = Clazz.GetField("RETRY_BACKOFF_MS_CONFIG"); + + public static readonly string METRICS_SAMPLE_WINDOW_MS_CONFIG = Clazz.GetField("METRICS_SAMPLE_WINDOW_MS_CONFIG"); + + public static readonly string METRICS_NUM_SAMPLES_CONFIG = Clazz.GetField("METRICS_NUM_SAMPLES_CONFIG"); + + public static readonly string METRICS_RECORDING_LEVEL_CONFIG = Clazz.GetField("METRICS_RECORDING_LEVEL_CONFIG"); + + public static readonly string METRIC_REPORTER_CLASSES_CONFIG = Clazz.GetField("METRIC_REPORTER_CLASSES_CONFIG"); + + public static readonly string SECURITY_PROTOCOL_CONFIG = Clazz.GetField("SECURITY_PROTOCOL_CONFIG"); + + public static readonly string SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = Clazz.GetField("SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG"); + + public static readonly string SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = Clazz.GetField("SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG"); + + public static readonly string CONNECTIONS_MAX_IDLE_MS_CONFIG = Clazz.GetField("CONNECTIONS_MAX_IDLE_MS_CONFIG"); + + public static readonly string REQUEST_TIMEOUT_MS_CONFIG = Clazz.GetField("REQUEST_TIMEOUT_MS_CONFIG"); + + public static readonly string DEFAULT_LIST_KEY_SERDE_INNER_CLASS = Clazz.GetField("DEFAULT_LIST_KEY_SERDE_INNER_CLASS"); + + public static readonly string DEFAULT_LIST_VALUE_SERDE_INNER_CLASS = Clazz.GetField("DEFAULT_LIST_VALUE_SERDE_INNER_CLASS"); + + public static readonly string DEFAULT_LIST_KEY_SERDE_TYPE_CLASS = Clazz.GetField("DEFAULT_LIST_KEY_SERDE_TYPE_CLASS"); + + public static readonly string DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS = Clazz.GetField("DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS"); + + public static readonly string GROUP_ID_CONFIG = Clazz.GetField("GROUP_ID_CONFIG"); + + public static readonly string GROUP_INSTANCE_ID_CONFIG = Clazz.GetField("GROUP_INSTANCE_ID_CONFIG"); + + public static readonly string MAX_POLL_INTERVAL_MS_CONFIG = Clazz.GetField("MAX_POLL_INTERVAL_MS_CONFIG"); + + public static readonly string REBALANCE_TIMEOUT_MS_CONFIG = Clazz.GetField("REBALANCE_TIMEOUT_MS_CONFIG"); + + public static readonly string SESSION_TIMEOUT_MS_CONFIG = Clazz.GetField("SESSION_TIMEOUT_MS_CONFIG"); + + public static readonly string HEARTBEAT_INTERVAL_MS_CONFIG = Clazz.GetField("HEARTBEAT_INTERVAL_MS_CONFIG"); + + public static readonly string DEFAULT_API_TIMEOUT_MS_CONFIG = Clazz.GetField("DEFAULT_API_TIMEOUT_MS_CONFIG"); + } + + public abstract class CommonClientConfigsBuilder : GenericConfigBuilder + where T : CommonClientConfigsBuilder, new() + { + public string BootstrapServers { get { return GetProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); } set { SetProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, value); } } + + public T WithBootstrapServers(string bootstrapServers) + { + var clone = Clone(); + clone.BootstrapServers = bootstrapServers; + return clone; + } + + public string ClientDnsLookup { get { return GetProperty(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG); } set { SetProperty(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, value); } } + + public T WithClientDnsLookup(string clientDnsLookup) + { + var clone = Clone(); + clone.ClientDnsLookup = clientDnsLookup; + return clone; + } + + public long MetadataMaxAge { get { return GetProperty(CommonClientConfigs.METADATA_MAX_AGE_CONFIG); } set { SetProperty(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, value); } } + + public T WithMetadataMaxAge(long metadataMaxAge) + { + var clone = Clone(); + clone.MetadataMaxAge = metadataMaxAge; + return clone; + } + + public int SendBuffer { get { return GetProperty(CommonClientConfigs.SEND_BUFFER_CONFIG); } set { SetProperty(CommonClientConfigs.SEND_BUFFER_CONFIG, value); } } + + public T WithSendBuffer(int sendBuffer) + { + var clone = Clone(); + clone.SendBuffer = sendBuffer; + return clone; + } + + public int ReceiveBuffer { get { return GetProperty(CommonClientConfigs.RECEIVE_BUFFER_CONFIG); } set { SetProperty(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, value); } } + + public T WithReceiveBuffer(int receiveBuffer) + { + var clone = Clone(); + clone.ReceiveBuffer = receiveBuffer; + return clone; + } + + public string ClientId { get { return GetProperty(CommonClientConfigs.CLIENT_ID_CONFIG); } set { SetProperty(CommonClientConfigs.CLIENT_ID_CONFIG, value); } } + + public T WithClientId(string clientId) + { + var clone = Clone(); + clone.ClientId = clientId; + return clone; + } + + public string ClientRack { get { return GetProperty(CommonClientConfigs.CLIENT_RACK_CONFIG); } set { SetProperty(CommonClientConfigs.CLIENT_RACK_CONFIG, value); } } + + public T WithClientRack(string clientRack) + { + var clone = Clone(); + clone.ClientRack = clientRack; + return clone; + } + + public long ReconnectBackoffMs { get { return GetProperty(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG); } set { SetProperty(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, value); } } + + public T WithReconnectBackoffMs(long reconnectBackoffMs) + { + var clone = Clone(); + clone.ReconnectBackoffMs = reconnectBackoffMs; + return clone; + } + + public long ReconnectBackoffMaxMs { get { return GetProperty(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG); } set { SetProperty(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, value); } } + + public T WithReconnectBackoffMaxMs(long reconnectBackoffMaxMs) + { + var clone = Clone(); + clone.ReconnectBackoffMaxMs = reconnectBackoffMaxMs; + return clone; + } + + public int Retries { get { return GetProperty(CommonClientConfigs.RETRIES_CONFIG); } set { SetProperty(CommonClientConfigs.RETRIES_CONFIG, value); } } + + public T WithRetries(int retries) + { + var clone = Clone(); + clone.Retries = retries; + return clone; + } + + public long RetryBackoffMs { get { return GetProperty(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); } set { SetProperty(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, value); } } + + public T WithRetryBackoffMs(long retryBackoffMs) + { + var clone = Clone(); + clone.RetryBackoffMs = retryBackoffMs; + return clone; + } + + public long MetricSampleWindowMs { get { return GetProperty(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG); } set { SetProperty(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG, value); } } + + public T WithMetricSampleWindowMs(long metricSampleWindowMs) + { + var clone = Clone(); + clone.MetricSampleWindowMs = metricSampleWindowMs; + return clone; + } + + public int MetricNumSample { get { return GetProperty(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG); } set { SetProperty(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG, value); } } + + public T WithMetricNumSample(int metricNumSample) + { + var clone = Clone(); + clone.MetricNumSample = metricNumSample; + return clone; + } + + // Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString() + public Sensor.RecordingLevel MetricRecordingLevel + { + get + { + var strName = GetProperty(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG); + if (System.Enum.GetName(typeof(Sensor.RecordingLevel), Sensor.RecordingLevel.DEBUG) == strName) + return Sensor.RecordingLevel.DEBUG; + else if (System.Enum.GetName(typeof(Sensor.RecordingLevel), Sensor.RecordingLevel.INFO) == strName) + return Sensor.RecordingLevel.INFO; + else if (System.Enum.GetName(typeof(Sensor.RecordingLevel), Sensor.RecordingLevel.TRACE) == strName) + return Sensor.RecordingLevel.TRACE; + else return Sensor.RecordingLevel.INFO; + } + set + { + SetProperty(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG, System.Enum.GetName(typeof(Sensor.RecordingLevel), value)); + } + } + + public T WithMetricRecordingLevel(Sensor.RecordingLevel metricRecordingLevel) + { + var clone = Clone(); + clone.MetricRecordingLevel = metricRecordingLevel; + return clone; + } + + [System.Obsolete("To be checked")] + public List MetricReporterClasses { get { return GetProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG); } set { SetProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, value); } } + + [System.Obsolete("To be checked")] + public T WithMetricReporterClasses(List metricReporterClasses) + { + var clone = Clone(); + clone.MetricReporterClasses = metricReporterClasses; + return clone; + } + + public string SecurityProtocol { get { return GetProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); } set { SetProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, value); } } + + public T WithSecurityProtocol(string securityProtocol) + { + var clone = Clone(); + clone.SecurityProtocol = securityProtocol; + return clone; + } + + public long SocketConnectionSetupTimeoutMs { get { return GetProperty(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG); } set { SetProperty(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, value); } } + + public T WithSocketConnectionSetupTimeoutMs(long socketConnectionSetupTimeoutMs) + { + var clone = Clone(); + clone.SocketConnectionSetupTimeoutMs = socketConnectionSetupTimeoutMs; + return clone; + } + + public long SocketConnectionSetupTimeoutMaxMs { get { return GetProperty(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG); } set { SetProperty(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, value); } } + + public T WithSocketConnectionSetupTimeoutMaxMs(long socketConnectionSetupTimeoutMaxMs) + { + var clone = Clone(); + clone.SocketConnectionSetupTimeoutMaxMs = socketConnectionSetupTimeoutMaxMs; + return clone; + } + + public long ConnectionMaxIdleMs { get { return GetProperty(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG); } set { SetProperty(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, value); } } + + public T WithConnectionMaxIdleMs(long connectionMaxIdleMs) + { + var clone = Clone(); + clone.ConnectionMaxIdleMs = connectionMaxIdleMs; + return clone; + } + + public int RequestTimeoutMs { get { return GetProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG); } set { SetProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, value); } } + + public T WithRequestTimeoutMs(int requestTimeoutMs) + { + var clone = Clone(); + clone.RequestTimeoutMs = requestTimeoutMs; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic DefaultListKeySerdeInnerClass { get { return GetProperty(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS); } set { SetProperty(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, value); } } + + [System.Obsolete("To be checked")] + public T WithDefaultListKeySerdeInnerClass(dynamic defaultListKeySerdeInnerClass) + { + var clone = Clone(); + clone.DefaultListKeySerdeInnerClass = defaultListKeySerdeInnerClass; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic DefaultListValueSerdeInnerClass { get { return GetProperty(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS); } set { SetProperty(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, value); } } + + [System.Obsolete("To be checked")] + public T WithDefaultListValueSerdeInnerClass(dynamic defaultListValueSerdeInnerClass) + { + var clone = Clone(); + clone.DefaultListValueSerdeInnerClass = defaultListValueSerdeInnerClass; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic DefaultListKeySerdeTypeClass { get { return GetProperty(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS); } set { SetProperty(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, value); } } + + [System.Obsolete("To be checked")] + public T WithDefaultListKeySerdeTypeClass(dynamic defaultListKeySerdeTypeClass) + { + var clone = Clone(); + clone.DefaultListKeySerdeTypeClass = defaultListKeySerdeTypeClass; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic DefaultListValueSerdeTypeClass { get { return GetProperty(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS); } set { SetProperty(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, value); } } + + [System.Obsolete("To be checked")] + public T WithDefaultListValueSerdeTypeClass(dynamic defaultListValueSerdeTypeClass) + { + var clone = Clone(); + clone.DefaultListValueSerdeTypeClass = defaultListValueSerdeTypeClass; + return clone; + } + + public string GroupId { get { return GetProperty(CommonClientConfigs.GROUP_ID_CONFIG); } set { SetProperty(CommonClientConfigs.GROUP_ID_CONFIG, value); } } + + public T WithGroupId(string groupId) + { + var clone = Clone(); + clone.GroupId = groupId; + return clone; + } + + public string GroupInstanceId { get { return GetProperty(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG); } set { SetProperty(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG, value); } } + + public T WithGroupInstanceId(string groupInstanceId) + { + var clone = Clone(); + clone.GroupInstanceId = groupInstanceId; + return clone; + } + + public int MaxPollIntervalMs { get { return GetProperty(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); } set { SetProperty(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG, value); } } + + public T WithMaxPollIntervalMs(int maxPollIntervalMs) + { + var clone = Clone(); + clone.MaxPollIntervalMs = maxPollIntervalMs; + return clone; + } + + public int RebalanceTimeoutMs { get { return GetProperty(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG); } set { SetProperty(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG, value); } } + + public T WithRebalanceTimeoutMs(int rebalanceTimeoutMs) + { + var clone = Clone(); + clone.RebalanceTimeoutMs = rebalanceTimeoutMs; + return clone; + } + + public int SessionTimeoutMs { get { return GetProperty(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG); } set { SetProperty(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, value); } } + + public T WithSessionTimeoutMs(int sessionTimeoutMs) + { + var clone = Clone(); + clone.SessionTimeoutMs = sessionTimeoutMs; + return clone; + } + + public int HeartbeatIntervalMs { get { return GetProperty(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG); } set { SetProperty(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG, value); } } + + public T WithHeartbeatIntervalMs(int heartbeatIntervalMs) + { + var clone = Clone(); + clone.HeartbeatIntervalMs = heartbeatIntervalMs; + return clone; + } + + public long DefaultApiTimeoutMs { get { return GetProperty(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG); } set { SetProperty(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG, value); } } + + public T WithDefaultApiTimeoutMs(long defaultApiTimeoutMs) + { + var clone = Clone(); + clone.DefaultApiTimeoutMs = defaultApiTimeoutMs; + return clone; + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerConfig.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerConfig.cs index e61361e069..38e202e879 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerConfig.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Consumer/ConsumerConfig.cs @@ -16,12 +16,22 @@ * Refer to LICENSE for more information. */ +using MASES.KafkaBridge.Common; +using MASES.KafkaBridge.Common.Config; +using MASES.KafkaBridge.Common.Metrics; using MASES.KafkaBridge.Java.Util; namespace MASES.KafkaBridge.Clients.Consumer { - public class ConsumerConfig : JCOBridge.C2JBridge.JVMBridgeBase + public class ConsumerConfig : AbstractConfig { + public enum AutoOffsetReset + { + None, + EARLIEST, + LATEST + } + public override string ClassName => "org.apache.kafka.clients.consumer.ConsumerConfig"; public static readonly string GROUP_ID_CONFIG = Clazz.GetField("GROUP_ID_CONFIG"); @@ -54,7 +64,6 @@ public class ConsumerConfig : JCOBridge.C2JBridge.JVMBridgeBase public static readonly int DEFAULT_FETCH_MAX_BYTES = Clazz.GetField("DEFAULT_FETCH_MAX_BYTES"); - public static readonly string FETCH_MAX_WAIT_MS_CONFIG = Clazz.GetField("FETCH_MAX_WAIT_MS_CONFIG"); public static readonly string METADATA_MAX_AGE_CONFIG = Clazz.GetField("METADATA_MAX_AGE_CONFIG"); @@ -108,6 +117,7 @@ public class ConsumerConfig : JCOBridge.C2JBridge.JVMBridgeBase public static readonly bool DEFAULT_EXCLUDE_INTERNAL_TOPICS = Clazz.GetField("DEFAULT_EXCLUDE_INTERNAL_TOPICS"); public static readonly string ISOLATION_LEVEL_CONFIG = Clazz.GetField("ISOLATION_LEVEL_CONFIG"); + public static readonly string DEFAULT_ISOLATION_LEVEL = Clazz.GetField("DEFAULT_ISOLATION_LEVEL"); public static readonly string ALLOW_AUTO_CREATE_TOPICS_CONFIG = Clazz.GetField("ALLOW_AUTO_CREATE_TOPICS_CONFIG"); @@ -130,4 +140,196 @@ public ConsumerConfig(Map props) { } } + + public class ConsumerConfigBuilder : CommonClientConfigsBuilder + { + public int MaxPollRecords { get { return GetProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG); } set { SetProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, value); } } + + public ConsumerConfigBuilder WithMaxPollRecords(int maxPollRecords) + { + var clone = Clone(); + clone.MaxPollRecords = maxPollRecords; + return clone; + } + + public bool EnableAutoCommit { get { return GetProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); } set { SetProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, value); } } + + public ConsumerConfigBuilder WithEnableAutoCommit(bool enableAutoCommit) + { + var clone = Clone(); + clone.EnableAutoCommit = enableAutoCommit; + return clone; + } + + public int AutoCommitIntervalMs { get { return GetProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); } set { SetProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, value); } } + + public ConsumerConfigBuilder WithAutoCommitIntervalMs(int autoCommitIntervalMs) + { + var clone = Clone(); + clone.AutoCommitIntervalMs = autoCommitIntervalMs; + return clone; + } + + public string PartitionAssignmentStrategy { get { return GetProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG); } set { SetProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, value); } } + + public ConsumerConfigBuilder WithPartitionAssignmentStrategy(string partitionAssignmentStrategy) + { + var clone = Clone(); + clone.PartitionAssignmentStrategy = partitionAssignmentStrategy; + return clone; + } + + // "latest", "earliest", "none" + public ConsumerConfig.AutoOffsetReset AutoOffsetReset + { + get + { + var strName = GetProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + if (System.Enum.GetName(typeof(ConsumerConfig.AutoOffsetReset), ConsumerConfig.AutoOffsetReset.None).ToLowerInvariant() == strName) + return ConsumerConfig.AutoOffsetReset.None; + else if (System.Enum.GetName(typeof(ConsumerConfig.AutoOffsetReset), ConsumerConfig.AutoOffsetReset.EARLIEST).ToLowerInvariant() == strName) + return ConsumerConfig.AutoOffsetReset.EARLIEST; + else if (System.Enum.GetName(typeof(ConsumerConfig.AutoOffsetReset), ConsumerConfig.AutoOffsetReset.LATEST).ToLowerInvariant() == strName) + return ConsumerConfig.AutoOffsetReset.LATEST; + else return ConsumerConfig.AutoOffsetReset.LATEST; + } + set + { + SetProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, System.Enum.GetName(typeof(Sensor.RecordingLevel), value).ToLowerInvariant()); + } + } + + public ConsumerConfigBuilder WithAutoOffsetReset(ConsumerConfig.AutoOffsetReset autoOffsetReset) + { + var clone = Clone(); + clone.AutoOffsetReset = autoOffsetReset; + return clone; + } + + public int FetchMinBytes { get { return GetProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG); } set { SetProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, value); } } + + public ConsumerConfigBuilder WithFetchMinBytes(int fetchMinBytes) + { + var clone = Clone(); + clone.FetchMinBytes = fetchMinBytes; + return clone; + } + + public int FetchMaxBytes { get { return GetProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG); } set { SetProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, value); } } + + public ConsumerConfigBuilder WithFetchMaxBytes(int fetchMaxBytes) + { + var clone = Clone(); + clone.FetchMaxBytes = fetchMaxBytes; + return clone; + } + + public int FetchMaxWaitMs { get { return GetProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); } set { SetProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, value); } } + + public ConsumerConfigBuilder WithFetchMaxWaitMs(int fetchMaxWaitMs) + { + var clone = Clone(); + clone.FetchMaxWaitMs = fetchMaxWaitMs; + return clone; + } + + public int MaxPartitionFetchBytes { get { return GetProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG); } set { SetProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, value); } } + + public ConsumerConfigBuilder WithMaxPartitionFetchBytes(int maxPartitionFetchBytes) + { + var clone = Clone(); + clone.MaxPartitionFetchBytes = maxPartitionFetchBytes; + return clone; + } + + public bool CheckCrcs { get { return GetProperty(ConsumerConfig.CHECK_CRCS_CONFIG); } set { SetProperty(ConsumerConfig.CHECK_CRCS_CONFIG, value); } } + + public ConsumerConfigBuilder WithCheckCrcs(bool checkCrcs) + { + var clone = Clone(); + clone.CheckCrcs = checkCrcs; + return clone; + } + + public string KeyDeserializerClass { get { return GetProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); } set { SetProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, value); } } + + public ConsumerConfigBuilder WithKeyDeserializerClass(string keyDeserializerClass) + { + var clone = Clone(); + clone.KeyDeserializerClass = keyDeserializerClass; + return clone; + } + + public string ValueDeserializerClass { get { return GetProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); } set { SetProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value); } } + + public ConsumerConfigBuilder WithValueDeserializerClass(string valueDeserializerClass) + { + var clone = Clone(); + clone.ValueDeserializerClass = valueDeserializerClass; + return clone; + } + + [System.Obsolete("To be checked")] + public List InterceptorClasses { get { return GetProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG); } set { SetProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, value); } } + + [System.Obsolete("To be checked")] + public ConsumerConfigBuilder WithInterceptorClasses(List interceptorClasses) + { + var clone = Clone(); + clone.InterceptorClasses = interceptorClasses; + return clone; + } + + public bool ExcludeInternalTopics { get { return GetProperty(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG); } set { SetProperty(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, value); } } + + public ConsumerConfigBuilder WithExcludeInternalTopics(bool excludeInternalTopics) + { + var clone = Clone(); + clone.ExcludeInternalTopics = excludeInternalTopics; + return clone; + } + + // IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT) + public IsolationLevel IsolationLevel + { + get + { + var strName = GetProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG); + if (System.Enum.GetName(typeof(IsolationLevel), IsolationLevel.READ_COMMITTED).ToLowerInvariant() == strName) + return IsolationLevel.READ_COMMITTED; + else if (System.Enum.GetName(typeof(IsolationLevel), IsolationLevel.READ_UNCOMMITTED).ToLowerInvariant() == strName) + return IsolationLevel.READ_UNCOMMITTED; + else return IsolationLevel.READ_UNCOMMITTED; + } + set + { + SetProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, System.Enum.GetName(typeof(IsolationLevel), value).ToLowerInvariant()); + } + } + + public ConsumerConfigBuilder WithIsolationLevel(IsolationLevel isolationLevel) + { + var clone = Clone(); + clone.IsolationLevel = isolationLevel; + return clone; + } + + public bool AllowAutoCreateTopics { get { return GetProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); } set { SetProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, value); } } + + public ConsumerConfigBuilder WithAllowAutoCreateTopics(bool allowAutoCreateTopics) + { + var clone = Clone(); + clone.AllowAutoCreateTopics = allowAutoCreateTopics; + return clone; + } + + public string SecurityProviders { get { return GetProperty(ConsumerConfig.SECURITY_PROVIDERS_CONFIG); } set { SetProperty(ConsumerConfig.SECURITY_PROVIDERS_CONFIG, value); } } + + public ConsumerConfigBuilder WithSecurityProviders(string securityProviders) + { + var clone = Clone(); + clone.SecurityProviders = securityProviders; + return clone; + } + } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Producer/ProducerConfig.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Producer/ProducerConfig.cs index b1c94d5cce..d7001a59c2 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Producer/ProducerConfig.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Clients/Producer/ProducerConfig.cs @@ -16,12 +16,30 @@ * Refer to LICENSE for more information. */ +using MASES.KafkaBridge.Common.Config; using MASES.KafkaBridge.Java.Util; namespace MASES.KafkaBridge.Clients.Producer { - public class ProducerConfig : JCOBridge.C2JBridge.JVMBridgeBase + public class ProducerConfig : AbstractConfig { + public enum Acks + { + All, + MinusOne, + None, + One + } + + public enum CompressionType + { + none, + gzip, + snappy, + lz4, + zstd + } + public override string ClassName => "org.apache.kafka.clients.producer.ProducerConfig"; public static readonly string BOOTSTRAP_SERVERS_CONFIG = Clazz.GetField("BOOTSTRAP_SERVERS_CONFIG"); @@ -110,4 +128,211 @@ public ProducerConfig(Map props) { } } + + public class ProducerConfigBuilder : CommonClientConfigsBuilder + { + public long MetadataMaxIdle { get { return GetProperty(ProducerConfig.METADATA_MAX_IDLE_CONFIG); } set { SetProperty(ProducerConfig.METADATA_MAX_IDLE_CONFIG, value); } } + + public ProducerConfigBuilder WithMetadataMaxIdle(long metadataMaxIdle) + { + var clone = Clone(); + clone.MetadataMaxIdle = metadataMaxIdle; + return clone; + } + + public int BatchSize { get { return GetProperty(ProducerConfig.BATCH_SIZE_CONFIG); } set { SetProperty(ProducerConfig.BATCH_SIZE_CONFIG, value); } } + + public ProducerConfigBuilder WithBatchSize(int batchSize) + { + var clone = Clone(); + clone.BatchSize = batchSize; + return clone; + } + + // "all", "-1", "0", "1" + public ProducerConfig.Acks Acks + { + get + { + var strName = GetProperty(ProducerConfig.ACKS_CONFIG); + if (strName == "all") return ProducerConfig.Acks.All; + else if (strName == "-1") return ProducerConfig.Acks.MinusOne; + else if (strName == "0") return ProducerConfig.Acks.None; + else if (strName == "1") return ProducerConfig.Acks.One; + + return ProducerConfig.Acks.None; + } + set + { + string str = value switch + { + ProducerConfig.Acks.All => "all", + ProducerConfig.Acks.MinusOne => "-1", + ProducerConfig.Acks.None => "0", + ProducerConfig.Acks.One => "1", + _ => "all", + }; + SetProperty(ProducerConfig.ACKS_CONFIG, str); + } + } + + public ProducerConfigBuilder WithAcks(ProducerConfig.Acks acks) + { + var clone = Clone(); + clone.Acks = acks; + return clone; + } + + public long LingerMs { get { return GetProperty(ProducerConfig.LINGER_MS_CONFIG); } set { SetProperty(ProducerConfig.LINGER_MS_CONFIG, value); } } + + public ProducerConfigBuilder WithLingerMs(long lingerMs) + { + var clone = Clone(); + clone.LingerMs = lingerMs; + return clone; + } + + public int DeliveryTimeoutMs { get { return GetProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); } set { SetProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, value); } } + + public ProducerConfigBuilder WithDeliveryTimeoutMs(int deliveryTimeoutMs) + { + var clone = Clone(); + clone.DeliveryTimeoutMs = deliveryTimeoutMs; + return clone; + } + + public int MaxRequestSize { get { return GetProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); } set { SetProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, value); } } + + public ProducerConfigBuilder WithMaxRequestSize(int maxRequestSize) + { + var clone = Clone(); + clone.MaxRequestSize = maxRequestSize; + return clone; + } + + public long MaxBlockMs { get { return GetProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG); } set { SetProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, value); } } + + public ProducerConfigBuilder WithMaxBlockMs(long maxBlockMs) + { + var clone = Clone(); + clone.MaxBlockMs = maxBlockMs; + return clone; + } + + public long BufferMemory { get { return GetProperty(ProducerConfig.BUFFER_MEMORY_CONFIG); } set { SetProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, value); } } + + public ProducerConfigBuilder WithBufferMemory(long bufferMemory) + { + var clone = Clone(); + clone.BufferMemory = bufferMemory; + return clone; + } + + public ProducerConfig.CompressionType CompressionType + { + get + { + var strName = GetProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG); + if (System.Enum.TryParse(strName, out var rest)) + { + return rest; + } + return ProducerConfig.CompressionType.none; + } + set + { + SetProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, System.Enum.GetName(typeof(ProducerConfig.CompressionType), value).ToLowerInvariant()); + } + } + + public ProducerConfigBuilder WithCompressionType(ProducerConfig.CompressionType compressionType) + { + var clone = Clone(); + clone.CompressionType = compressionType; + return clone; + } + + public int MaxInFlightRequestPerConnection { get { return GetProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); } set { SetProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, value); } } + + public ProducerConfigBuilder WitMaxInFlightRequestPerConnection(int maxInFlightRequestPerConnection) + { + var clone = Clone(); + clone.MaxInFlightRequestPerConnection = maxInFlightRequestPerConnection; + return clone; + } + + public string KeySerializerClass { get { return GetProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); } set { SetProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, value); } } + + public ProducerConfigBuilder WithKeySerializerClass(string keySerializerClass) + { + var clone = Clone(); + clone.KeySerializerClass = keySerializerClass; + return clone; + } + + public string ValueSerializerClass { get { return GetProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); } set { SetProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, value); } } + + public ProducerConfigBuilder WithValueSerializerClass(string valueSerializerClass) + { + var clone = Clone(); + clone.ValueSerializerClass = valueSerializerClass; + return clone; + } + + public dynamic PartitionerClass { get => GetProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG); set { SetProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, value); } } + + public ProducerConfigBuilder WithPartitionerClass(dynamic partitionerClass) + { + var clone = Clone(); + clone.PartitionerClass = partitionerClass; + return clone; + } + + [System.Obsolete("To be checked")] + public List InterceptorClasses { get { return GetProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG); } set { SetProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, value); } } + + [System.Obsolete("To be checked")] + public ProducerConfigBuilder WithInterceptorClasses(List interceptorClasses) + { + var clone = Clone(); + clone.InterceptorClasses = interceptorClasses; + return clone; + } + + public bool EnableIdempotence { get { return GetProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG); } set { SetProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, value); } } + + public ProducerConfigBuilder WithEnableIdempotence(bool enableIdempotence) + { + var clone = Clone(); + clone.EnableIdempotence = enableIdempotence; + return clone; + } + + public int TransactionTimeout { get { return GetProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); } set { SetProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, value); } } + + public ProducerConfigBuilder WitTransactionTimeout(int transactionTimeout) + { + var clone = Clone(); + clone.TransactionTimeout = transactionTimeout; + return clone; + } + + public string TransactionalId { get { return GetProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG); } set { SetProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, value); } } + + public ProducerConfigBuilder WitTransactionalId(string transactionalId) + { + var clone = Clone(); + clone.TransactionalId = transactionalId; + return clone; + } + + public string SecurityProviders { get { return GetProperty(ProducerConfig.SECURITY_PROVIDERS_CONFIG); } set { SetProperty(ProducerConfig.SECURITY_PROVIDERS_CONFIG, value); } } + + public ProducerConfigBuilder WithSecurityProviders(string securityProviders) + { + var clone = Clone(); + clone.SecurityProviders = securityProviders; + return clone; + } + } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Config/AbstractConfig.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Config/AbstractConfig.cs new file mode 100644 index 0000000000..0bd8c09edc --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Config/AbstractConfig.cs @@ -0,0 +1,39 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; + +namespace MASES.KafkaBridge.Common.Config +{ + public class AbstractConfig : JVMBridgeBase + where TClass : IJVMBridgeBase, new() + { + public override string ClassName => "org.apache.kafka.common.config.AbstractConfig"; + + [System.Obsolete("This is not public in Apache Kafka API")] + [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] + public AbstractConfig() + { + } + + public AbstractConfig(params object[] args) + : base(args) + { + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Config/TopicConfig.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Config/TopicConfig.cs index 7eb27990af..6ac6b72dd1 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Config/TopicConfig.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Common/Config/TopicConfig.cs @@ -16,6 +16,8 @@ * Refer to LICENSE for more information. */ +using MASES.KafkaBridge.Clients.Producer; + namespace MASES.KafkaBridge.Common.Config { public class TopicConfig : JCOBridge.C2JBridge.JVMBridgeBase @@ -58,6 +60,13 @@ public class TopicConfig : JCOBridge.C2JBridge.JVMBridgeBase public static readonly string MAX_COMPACTION_LAG_MS_CONFIG = Clazz.GetField("MAX_COMPACTION_LAG_MS_CONFIG"); public static readonly string MIN_CLEANABLE_DIRTY_RATIO_CONFIG = Clazz.GetField("MIN_CLEANABLE_DIRTY_RATIO_CONFIG"); + [System.Flags] + public enum CleanupPolicy + { + None = 0, + Compact = 0x1, + Delete = 0x2 + } public static readonly string CLEANUP_POLICY_CONFIG = Clazz.GetField("CLEANUP_POLICY_CONFIG"); public static readonly string CLEANUP_POLICY_COMPACT = Clazz.GetField("CLEANUP_POLICY_COMPACT"); @@ -67,6 +76,16 @@ public class TopicConfig : JCOBridge.C2JBridge.JVMBridgeBase public static readonly string MIN_IN_SYNC_REPLICAS_CONFIG = Clazz.GetField("MIN_IN_SYNC_REPLICAS_CONFIG"); + public enum CompressionType + { + uncompressed, + gzip, + snappy, + lz4, + zstd, + producer + } + public static readonly string COMPRESSION_TYPE_CONFIG = Clazz.GetField("COMPRESSION_TYPE_CONFIG"); public static readonly string PREALLOCATE_CONFIG = Clazz.GetField("PREALLOCATE_CONFIG"); @@ -74,6 +93,12 @@ public class TopicConfig : JCOBridge.C2JBridge.JVMBridgeBase [System.Obsolete()] public static readonly string MESSAGE_FORMAT_VERSION_CONFIG = Clazz.GetField("MESSAGE_FORMAT_VERSION_CONFIG"); + public enum MessageTimestampType + { + CreateTime, + LogAppendTime, + } + public static readonly string MESSAGE_TIMESTAMP_TYPE_CONFIG = Clazz.GetField("MESSAGE_TIMESTAMP_TYPE_CONFIG"); public static readonly string MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = Clazz.GetField("MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG"); @@ -81,4 +106,289 @@ public class TopicConfig : JCOBridge.C2JBridge.JVMBridgeBase public static readonly string MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = Clazz.GetField("MESSAGE_DOWNCONVERSION_ENABLE_CONFIG"); } + + public class TopicConfigBuilder : GenericConfigBuilder + { + public int SegmentBytes { get { return GetProperty(TopicConfig.SEGMENT_BYTES_CONFIG); } set { SetProperty(TopicConfig.SEGMENT_BYTES_CONFIG, value); } } + + public TopicConfigBuilder WithSegmentBytes(int segmentBytes) + { + var clone = Clone(); + clone.SegmentBytes = segmentBytes; + return clone; + } + + public int SegmentMs { get { return GetProperty(TopicConfig.SEGMENT_MS_CONFIG); } set { SetProperty(TopicConfig.SEGMENT_MS_CONFIG, value); } } + + public TopicConfigBuilder WithSegmentMs(int segmentMs) + { + var clone = Clone(); + clone.SegmentMs = segmentMs; + return clone; + } + + public int SegmentJitterMs { get { return GetProperty(TopicConfig.SEGMENT_JITTER_MS_CONFIG); } set { SetProperty(TopicConfig.SEGMENT_JITTER_MS_CONFIG, value); } } + + public TopicConfigBuilder WithSegmentJitterMs(int segmentJitterMs) + { + var clone = Clone(); + clone.SegmentJitterMs = segmentJitterMs; + return clone; + } + + public int SegmentIndexBytes { get { return GetProperty(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG); } set { SetProperty(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, value); } } + + public TopicConfigBuilder WithSegmentIndexBytes(int segmentIndexBytes) + { + var clone = Clone(); + clone.SegmentIndexBytes = segmentIndexBytes; + return clone; + } + + public int FlushMessageInterval { get { return GetProperty(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG); } set { SetProperty(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, value); } } + + public TopicConfigBuilder WithFlushMessageInterval(int flushMessageInterval) + { + var clone = Clone(); + clone.FlushMessageInterval = flushMessageInterval; + return clone; + } + + public int FlushMs { get { return GetProperty(TopicConfig.FLUSH_MS_CONFIG); } set { SetProperty(TopicConfig.FLUSH_MS_CONFIG, value); } } + + public TopicConfigBuilder WithFlushMs(int flushMs) + { + var clone = Clone(); + clone.FlushMs = flushMs; + return clone; + } + + public int RetentionBytes { get { return GetProperty(TopicConfig.RETENTION_BYTES_CONFIG); } set { SetProperty(TopicConfig.RETENTION_BYTES_CONFIG, value); } } + + public TopicConfigBuilder WithRetentionBytes(int retentionBytes) + { + var clone = Clone(); + clone.RetentionBytes = retentionBytes; + return clone; + } + + public int RetentionMs { get { return GetProperty(TopicConfig.RETENTION_MS_CONFIG); } set { SetProperty(TopicConfig.RETENTION_MS_CONFIG, value); } } + + public TopicConfigBuilder WithRetentionMs(int retentionMs) + { + var clone = Clone(); + clone.RetentionMs = retentionMs; + return clone; + } + + public bool RemoteLogStorageEnable { get { return GetProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); } set { SetProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, value); } } + + public TopicConfigBuilder WithRemoteLogStorageEnable(bool remoteLogStorageEnable) + { + var clone = Clone(); + clone.RemoteLogStorageEnable = remoteLogStorageEnable; + return clone; + } + + public int LocalLogRetentionMs { get { return GetProperty(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG); } set { SetProperty(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, value); } } + + public TopicConfigBuilder WithLocalLogRetentionMs(int localLogRetentionMs) + { + var clone = Clone(); + clone.LocalLogRetentionMs = localLogRetentionMs; + return clone; + } + + public int LocalLogRetentionBytes { get { return GetProperty(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); } set { SetProperty(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, value); } } + + public TopicConfigBuilder WithLocalLogRetentionBytes(int localLogRetentionBytes) + { + var clone = Clone(); + clone.LocalLogRetentionBytes = localLogRetentionBytes; + return clone; + } + + public int MaxMessageBytes { get { return GetProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG); } set { SetProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, value); } } + + public TopicConfigBuilder WithMaxMessageBytes(int maxMessageBytes) + { + var clone = Clone(); + clone.MaxMessageBytes = maxMessageBytes; + return clone; + } + + public int IndexIntervalBytes { get { return GetProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG); } set { SetProperty(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, value); } } + + public TopicConfigBuilder WithIndexIntervalBytes(int indexIntervalBytes) + { + var clone = Clone(); + clone.IndexIntervalBytes = indexIntervalBytes; + return clone; + } + + public int FileDeleteDelayMs { get { return GetProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG); } set { SetProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, value); } } + + public TopicConfigBuilder WithFileDeleteDelayMs(int fileDeleteDelayMs) + { + var clone = Clone(); + clone.FileDeleteDelayMs = fileDeleteDelayMs; + return clone; + } + + public int DeleteRetentionMs { get { return GetProperty(TopicConfig.DELETE_RETENTION_MS_CONFIG); } set { SetProperty(TopicConfig.DELETE_RETENTION_MS_CONFIG, value); } } + + public TopicConfigBuilder WithDeleteRetentionMs(int deleteRetentionMs) + { + var clone = Clone(); + clone.DeleteRetentionMs = deleteRetentionMs; + return clone; + } + + public int MinCompactationLagMs { get { return GetProperty(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG); } set { SetProperty(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, value); } } + + public TopicConfigBuilder WithMinCompactationLagMs(int minCompactationLagMs) + { + var clone = Clone(); + clone.MinCompactationLagMs = minCompactationLagMs; + return clone; + } + + public int MaxCompactationLagMs { get { return GetProperty(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); } set { SetProperty(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, value); } } + + public TopicConfigBuilder WithMaxCompactationLagMs(int maxCompactationLagMs) + { + var clone = Clone(); + clone.MaxCompactationLagMs = maxCompactationLagMs; + return clone; + } + + public int MinCleanableDirtyRatio { get { return GetProperty(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG); } set { SetProperty(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, value); } } + + public TopicConfigBuilder WithMinCleanableDirtyRatio(int minCleanableDirtyRatio) + { + var clone = Clone(); + clone.MinCleanableDirtyRatio = minCleanableDirtyRatio; + return clone; + } + + public TopicConfig.CleanupPolicy CleanupPolicy + { + get + { + var policyStr = GetProperty(TopicConfig.CLEANUP_POLICY_CONFIG); + TopicConfig.CleanupPolicy policy = TopicConfig.CleanupPolicy.None; + if (policyStr.Contains(TopicConfig.CLEANUP_POLICY_COMPACT)) policy |= TopicConfig.CleanupPolicy.Compact; + if (policyStr.Contains(TopicConfig.CLEANUP_POLICY_DELETE)) policy |= TopicConfig.CleanupPolicy.Delete; + return policy; + } + set + { + if (value == TopicConfig.CleanupPolicy.None) return; + var str = string.Empty; + if (value.HasFlag(TopicConfig.CleanupPolicy.Compact)) str += TopicConfig.CLEANUP_POLICY_COMPACT; + if (value.HasFlag(TopicConfig.CleanupPolicy.Delete)) str += TopicConfig.CLEANUP_POLICY_DELETE; + SetProperty(TopicConfig.CLEANUP_POLICY_CONFIG, str); + } + } + + public TopicConfigBuilder WithCleanupPolicy(TopicConfig.CleanupPolicy cleanupPolicy) + { + var clone = Clone(); + clone.CleanupPolicy = cleanupPolicy; + return clone; + } + + public bool UncleanLeaderElectionEnable { get { return GetProperty(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG); } set { SetProperty(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value); } } + + public TopicConfigBuilder WithUncleanLeaderElectionEnable(bool uncleanLeaderElectionEnable) + { + var clone = Clone(); + clone.UncleanLeaderElectionEnable = uncleanLeaderElectionEnable; + return clone; + } + + public int MinInSyncReplicas { get { return GetProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG); } set { SetProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, value); } } + + public TopicConfigBuilder WithMinInSyncReplicas(int minInSyncReplicas) + { + var clone = Clone(); + clone.MinInSyncReplicas = minInSyncReplicas; + return clone; + } + + public TopicConfig.CompressionType CompressionType + { + get + { + var strName = GetProperty(TopicConfig.COMPRESSION_TYPE_CONFIG); + if (System.Enum.TryParse(strName, out var rest)) + { + return rest; + } + return TopicConfig.CompressionType.producer; + } + set + { + SetProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, System.Enum.GetName(typeof(TopicConfig.CompressionType), value).ToLowerInvariant()); + } + } + + public TopicConfigBuilder WithCompressionType(TopicConfig.CompressionType compressionType) + { + var clone = Clone(); + clone.CompressionType = compressionType; + return clone; + } + + public bool Preallocate { get { return GetProperty(TopicConfig.PREALLOCATE_CONFIG); } set { SetProperty(TopicConfig.PREALLOCATE_CONFIG, value); } } + + public TopicConfigBuilder WithPreallocate(bool preallocate) + { + var clone = Clone(); + clone.Preallocate = preallocate; + return clone; + } + + public TopicConfig.MessageTimestampType MessageTimestampType + { + get + { + var strName = GetProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG); + if (System.Enum.TryParse(strName, out var rest)) + { + return rest; + } + return TopicConfig.MessageTimestampType.CreateTime; + } + set + { + SetProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, System.Enum.GetName(typeof(TopicConfig.MessageTimestampType), value)); + } + } + + public TopicConfigBuilder WithCompressionType(TopicConfig.MessageTimestampType messageTimestampType) + { + var clone = Clone(); + clone.MessageTimestampType = messageTimestampType; + return clone; + } + + public int MessageTimestampDifferenceMaxMs { get { return GetProperty(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG); } set { SetProperty(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, value); } } + + public TopicConfigBuilder WithMessageTimestampDifferenceMaxMs(int messageTimestampDifferenceMaxMs) + { + var clone = Clone(); + clone.MessageTimestampDifferenceMaxMs = messageTimestampDifferenceMaxMs; + return clone; + } + + public bool MessageDownConversionEnable { get { return GetProperty(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG); } set { SetProperty(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, value); } } + + public TopicConfigBuilder WithMessageDownConversionEnable(bool messageDownConversionEnable) + { + var clone = Clone(); + clone.MessageDownConversionEnable = messageDownConversionEnable; + return clone; + } + } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Connect/Data/Schema.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Connect/Data/Schema.cs index 2f75e29949..454e7bd3eb 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Connect/Data/Schema.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Connect/Data/Schema.cs @@ -122,15 +122,8 @@ public Schema() { } [System.Obsolete("This is not public in Apache Kafka API")] [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] - public Schema(Type type) - : base(type) - { - } - - [System.Obsolete("This is not public in Apache Kafka API")] - [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] - public Schema(Type type, bool optional, Java.Lang.Object defaultValue, string name, int version, string doc, Map parameters, List fields, Schema keySchema, Schema valueSchema) - : base(type, optional, defaultValue, name, version, doc, parameters, fields, keySchema, valueSchema) + public Schema(params object[] args) + : base(args) { } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Lang/Annotation/Annotation.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Lang/Annotation/Annotation.cs new file mode 100644 index 0000000000..bf67e95f82 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Lang/Annotation/Annotation.cs @@ -0,0 +1,29 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; + +namespace MASES.KafkaBridge.Java.Lang +{ + public class Annotation : JVMBridgeBase + { + public override string ClassName => "java.lang.annotation.Annotation"; + + public Class AnnotationType => IExecute("annotationType"); + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Lang/Class.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Lang/Class.cs new file mode 100644 index 0000000000..2868093af2 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Lang/Class.cs @@ -0,0 +1,79 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; +using MASES.JCOBridge.C2JBridge.JVMInterop; + +namespace MASES.KafkaBridge.Java.Lang +{ + public class Class : JVMBridgeBase + { + public override string ClassName => "java.lang.Class"; + + public static Class ForName(string className) + { + return SExecute("forName", className); + } + + public static Class ForName() where T : IJVMBridgeBase, new() + { + var className = new T().ClassName; + var exType = SExecute("forName", className); + return SExecute>("forName", className); + } + + public string Name => IExecute("getName"); + + public bool IsAnnotation => IExecute("isAnnotation"); + + // TO BE VERIFIED + //public bool IsAnnotationPresent(Class annotationClass) + // where T : Annotation + //{ + // return IExecute("isAnnotationPresent", annotationClass.JVMType); + //} + + public bool IsAnonymousClass => IExecute("isAnonymousClass"); + + public bool IsArray => IExecute("isArray"); + + public bool IsAssignableFrom(Class cls) => IExecute("isAssignableFrom", cls); + + public bool IsEnum => IExecute("isEnum"); + + public bool IsInstance(object obj) => IExecute("isInstance", obj); + + public bool IsInterface => IExecute("isInterface"); + + public bool IsLocalClass => IExecute("isLocalClass"); + + public bool IsMemberClass => IExecute("isMemberClass"); + + public bool IsPrimitive => IExecute("isPrimitive"); + + public bool IsSynthetic => IExecute("isSynthetic"); + } + + public class Class : Class + where T : IJVMBridgeBase, new() + { + + + public IJavaType JVMType => (new T() as IJVMBridgeBaseStatic).Clazz; + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/AbstractMap.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/AbstractMap.cs new file mode 100644 index 0000000000..866575058e --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/AbstractMap.cs @@ -0,0 +1,35 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +namespace MASES.KafkaBridge.Java.Util +{ + public class AbstractMap : Map + { + public override string ClassName => "java.util.AbstractMap"; + + public AbstractMap() + { + } + + public AbstractMap(params object[] args) + : base(args) + { + + } + } +} \ No newline at end of file diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Dictionary.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Dictionary.cs new file mode 100644 index 0000000000..d754790bb8 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Dictionary.cs @@ -0,0 +1,27 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; + +namespace MASES.KafkaBridge.Java.Util +{ + public class Dictionary : JVMBridgeBase> + { + public override string ClassName => "java.util.Dictionary"; + } +} \ No newline at end of file diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/HashMap.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/HashMap.cs new file mode 100644 index 0000000000..5bcc3ffca9 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/HashMap.cs @@ -0,0 +1,44 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +namespace MASES.KafkaBridge.Java.Util +{ + public class HashMap : AbstractMap + { + public override string ClassName => "java.util.HashMap"; + + public HashMap() + { + } + + public HashMap(int initialCapacity) + : base(initialCapacity) + { + } + + public HashMap(int initialCapacity, float loadFactor) + : base(initialCapacity, loadFactor) + { + } + + public HashMap(Map m) + : base(m) + { + } + } +} \ No newline at end of file diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Hashtable.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Hashtable.cs new file mode 100644 index 0000000000..39bf5f010e --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Hashtable.cs @@ -0,0 +1,29 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; + +namespace MASES.KafkaBridge.Java.Util +{ + public class Hashtable : Dictionary + { + public override string ClassName => "java.util.Hashtable"; + + public static implicit operator Map(Hashtable table) { return Wraps>(table.Instance); } + } +} \ No newline at end of file diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Iterator.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Iterator.cs new file mode 100644 index 0000000000..0462bfee67 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Iterator.cs @@ -0,0 +1,31 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; + +namespace MASES.KafkaBridge.Java.Util +{ + public class Iterator : JVMBridgeBase> + { + public override string ClassName => "java.util.Iterator"; + + public bool HasNext => IExecute("hasNext"); + + public E Next => IExecute("next"); + } +} \ No newline at end of file diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Map.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Map.cs index 5c590c3fcc..b92431f1d0 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Map.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/Map.cs @@ -24,15 +24,33 @@ namespace MASES.KafkaBridge.Java.Util public class Map : JVMBridgeBase { public override string ClassName => "java.util.Map"; + + public Map() + { + } + + public Map(params object[] args) + : base(args) + { + } } public class Map : Map { public override string ClassName => "java.util.Map"; + public Map() + { + } + + public Map(params object[] args) + : base(args) + { + } + public virtual V Get​(K key) { return IExecute("get", key); } - public virtual V Put​(K key, V value) + public virtual V Put​(K key, V value) { var obj = IExecute("put", key, value); if (value is IJVMBridgeBase) diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/TreeMap.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/TreeMap.cs new file mode 100644 index 0000000000..9615d8e4e7 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/TreeMap.cs @@ -0,0 +1,29 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +namespace MASES.KafkaBridge.Java.Util +{ + public class TreeMap : AbstractMap + { + public override string ClassName => "java.util.TreeMap"; + + public TreeMap() + { + } + } +} \ No newline at end of file diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Lang/UUID.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/UUID.cs similarity index 92% rename from src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Lang/UUID.cs rename to src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/UUID.cs index 33c7af7054..46c20762d9 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Lang/UUID.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Java/Util/UUID.cs @@ -22,6 +22,6 @@ public sealed class UUID : JCOBridge.C2JBridge.JVMBridgeBase { public override bool IsStatic => true; - public override string ClassName => "java.lang.UUID"; + public override string ClassName => "java.util.UUID"; } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/ForeachProcessor.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/ForeachProcessor.cs new file mode 100644 index 0000000000..68b6a3da79 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/ForeachProcessor.cs @@ -0,0 +1,38 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; + +namespace MASES.KafkaBridge.Streams.KStream +{ + public class ForeachProcessor : JVMBridgeBase> + { + public override string ClassName => "org.apache.kafka.streams.kstream.ForeachProcessor"; + + [System.Obsolete("This is not public in Apache Kafka API")] + [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] + public ForeachProcessor() + { + } + + public ForeachProcessor(ForeachAction action) + : base(action) + { + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Internals/TimeWindow.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Internals/TimeWindow.cs new file mode 100644 index 0000000000..2412f55d98 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Internals/TimeWindow.cs @@ -0,0 +1,35 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +namespace MASES.KafkaBridge.Streams.KStream.Internals +{ + public class TimeWindow : Window + { + public override string ClassName => "org.apache.kafka.streams.kstream.internals.TimeWindow"; + + [System.Obsolete("This is not public in Apache Kafka API")] + [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] + public TimeWindow() { } + + public TimeWindow(long startMs, long endMs) + : base(startMs, endMs) + { + } + + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Internals/UnlimitedWindow.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Internals/UnlimitedWindow.cs new file mode 100644 index 0000000000..c96f7483bb --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Internals/UnlimitedWindow.cs @@ -0,0 +1,35 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +namespace MASES.KafkaBridge.Streams.KStream.Internals +{ + public class UnlimitedWindow : Window + { + public override string ClassName => "org.apache.kafka.streams.kstream.internals.UnlimitedWindow"; + + [System.Obsolete("This is not public in Apache Kafka API")] + [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] + public UnlimitedWindow() { } + + public UnlimitedWindow(long startMs) + : base(startMs, long.MaxValue) + { + } + + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/SessionWindowedDeserializer.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/SessionWindowedDeserializer.cs new file mode 100644 index 0000000000..db646b5a0a --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/SessionWindowedDeserializer.cs @@ -0,0 +1,47 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KafkaBridge.Common.Serialization; +using MASES.KafkaBridge.Java.Util; + +namespace MASES.KafkaBridge.Streams.KStream +{ + public class SessionWindowedDeserializer : JCOBridge.C2JBridge.JVMBridgeBase> + { + public override bool IsCloseable => true; + + public override string ClassName => "org.apache.kafka.streams.kstream.SessionWindowedDeserializer"; + + public SessionWindowedDeserializer() { } + + public SessionWindowedDeserializer(Deserializer inner) + : base(inner) + { + } + + public void Configure(Map configs, bool isKey) + { + IExecute("configure", configs, isKey); + } + + public Windowed Deserialize(string topic, byte[] data) + { + return IExecute>("deserialize", topic, data); + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/SessionWindowedSerializer.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/SessionWindowedSerializer.cs new file mode 100644 index 0000000000..0decc1de8b --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/SessionWindowedSerializer.cs @@ -0,0 +1,52 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KafkaBridge.Common.Serialization; +using MASES.KafkaBridge.Java.Util; + +namespace MASES.KafkaBridge.Streams.KStream +{ + public class SessionWindowedSerializer : JCOBridge.C2JBridge.JVMBridgeBase> + { + public override bool IsCloseable => true; + + public override string ClassName => "org.apache.kafka.streams.kstream.SessionWindowedSerializer"; + + public SessionWindowedSerializer() { } + + public SessionWindowedSerializer(Serializer inner) + : base(inner) + { + } + + public void Configure(Map configs, bool isKey) + { + IExecute("configure", configs, isKey); + } + + public byte[] Serialize(string topic, Windowed data) + { + return IExecute("serialize", topic, data); + } + + public byte[] SerializeBaseKey(string topic, Windowed data) + { + return IExecute("serializeBaseKey", topic, data); + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TableJoined.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TableJoined.cs new file mode 100644 index 0000000000..d3099f1f3f --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TableJoined.cs @@ -0,0 +1,54 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; +using MASES.KafkaBridge.Java.Lang; +using MASES.KafkaBridge.Streams.Processor; + +namespace MASES.KafkaBridge.Streams.KStream +{ + public class TableJoined : JVMBridgeBase>, INamedOperation> + { + public override string ClassName => "org.apache.kafka.streams.kstream.TableJoined"; + + public static TableJoined With(StreamPartitioner partitioner, StreamPartitioner otherPartitioner) + { + return SExecute>("with", partitioner, otherPartitioner); + } + + public static TableJoined As(string name) + { + return SExecute>("as", name); + } + + public TableJoined WithPartitioner(StreamPartitioner partitioner) + { + return IExecute>("withPartitioner", partitioner); + } + + public TableJoined WithOtherPartitioner(StreamPartitioner otherPartitioner) + { + return IExecute>("withOtherPartitioner", otherPartitioner); + } + + public TableJoined WithName(string name) + { + return IExecute>("withName", name); + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TimeWindowedDeserializer.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TimeWindowedDeserializer.cs new file mode 100644 index 0000000000..dd97f7cdc3 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TimeWindowedDeserializer.cs @@ -0,0 +1,54 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KafkaBridge.Common.Serialization; +using MASES.KafkaBridge.Java.Util; + +namespace MASES.KafkaBridge.Streams.KStream +{ + public class TimeWindowedDeserializer : JCOBridge.C2JBridge.JVMBridgeBase> + { + public override bool IsCloseable => true; + + public override string ClassName => "org.apache.kafka.streams.kstream.TimeWindowedDeserializer"; + + public TimeWindowedDeserializer() { } + + public TimeWindowedDeserializer(Deserializer inner, long windowSize) + : base(inner, windowSize) + { + } + + public long WindowSize => IExecute("getWindowSize"); + + public void Configure(Map configs, bool isKey) + { + IExecute("configure", configs, isKey); + } + + public Windowed Deserialize(string topic, byte[] data) + { + return IExecute>("deserialize", topic, data); + } + + public void SetIsChangelogTopic(bool isChangelogTopic) + { + IExecute("setIsChangelogTopic", isChangelogTopic); + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TimeWindowedSerializer.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TimeWindowedSerializer.cs new file mode 100644 index 0000000000..dd5533737f --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TimeWindowedSerializer.cs @@ -0,0 +1,52 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KafkaBridge.Common.Serialization; +using MASES.KafkaBridge.Java.Util; + +namespace MASES.KafkaBridge.Streams.KStream +{ + public class TimeWindowedSerializer : JCOBridge.C2JBridge.JVMBridgeBase> + { + public override bool IsCloseable => true; + + public override string ClassName => "org.apache.kafka.streams.kstream.TimeWindowedSerializer"; + + public TimeWindowedSerializer() { } + + public TimeWindowedSerializer(Serializer inner) + : base(inner) + { + } + + public void Configure(Map configs, bool isKey) + { + IExecute("configure", configs, isKey); + } + + public byte[] Serialize(string topic, Windowed data) + { + return IExecute("serialize", topic, data); + } + + public byte[] SerializeBaseKey(string topic, Windowed data) + { + return IExecute("serializeBaseKey", topic, data); + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TimeWindows.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TimeWindows.cs new file mode 100644 index 0000000000..bb6a36d1b7 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/TimeWindows.cs @@ -0,0 +1,43 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KafkaBridge.Java.Time; +using MASES.KafkaBridge.Streams.KStream.Internals; + +namespace MASES.KafkaBridge.Streams.KStream +{ + public class TimeWindows : Windows + { + public override string ClassName => "org.apache.kafka.streams.kstream.TimeWindows"; + + public static TimeWindows OfSizeWithNoGrace(Duration size) + { + return SExecute("ofSizeWithNoGrace", size); + } + + public static TimeWindows OfSizeAndGrace(Duration size, Duration afterWindowEnd) + { + return SExecute("ofSizeAndGrace", size, afterWindowEnd); + } + + public TimeWindows AdvanceBy(Duration advance) + { + return IExecute("advanceBy", advance); + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/UnlimitedWindows.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/UnlimitedWindows.cs new file mode 100644 index 0000000000..311338b47b --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/UnlimitedWindows.cs @@ -0,0 +1,38 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KafkaBridge.Java.Time; +using MASES.KafkaBridge.Streams.KStream.Internals; + +namespace MASES.KafkaBridge.Streams.KStream +{ + public class UnlimitedWindows : Windows + { + public override string ClassName => "org.apache.kafka.streams.kstream.UnlimitedWindows"; + + public static UnlimitedWindows Of() + { + return SExecute("of"); + } + + public UnlimitedWindows StartOn(Instant start) + { + return IExecute("startOn", start); + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Window.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Window.cs index 9c18c8de66..d838bcfe3e 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Window.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Window.cs @@ -33,12 +33,17 @@ public Window(long startMs, long endMs) { } - public virtual long Start() => IExecute("start"); + public virtual long Start => IExecute("start"); public virtual long End => IExecute("end"); - public virtual Instant startTime => IExecute("startTime"); + public virtual Instant StartTime => IExecute("startTime"); public virtual Instant EndTime => IExecute("endTime"); + + public bool Overlap(Window other) + { + return IExecute("overlap", other); + } } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Windowed.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Windowed.cs index e511e50787..7d7b645288 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Windowed.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/Windowed.cs @@ -18,33 +18,35 @@ namespace MASES.KafkaBridge.Streams.KStream { - public class Windowed : JCOBridge.C2JBridge.JVMBridgeBase> + public class Windowed : JCOBridge.C2JBridge.JVMBridgeBase { public override string ClassName => "org.apache.kafka.streams.kstream.Windowed"; [System.Obsolete("This is not public in Apache Kafka API")] [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] - public Windowed() { } + public Windowed() + { + } - public Windowed(K key, Window window) - : base(key, window) + public Windowed(object key, Window window) + : base(key, window) { } - public K Key => IExecute("key"); + public object Key => IExecute("key"); public Window Window => IExecute("window"); } - public class Windowed : Windowed + public class Windowed : Windowed { - public Windowed() - { - } + public Windowed() { } - public Windowed(object key, Window window) - : base(key, window) + public Windowed(K key, Window window) + : base(key, window) { } + + public new K Key => IExecute("key"); } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/WindowedSerdes.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/WindowedSerdes.cs new file mode 100644 index 0000000000..3c49d34d84 --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/KStream/WindowedSerdes.cs @@ -0,0 +1,40 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; +using MASES.KafkaBridge.Common.Serialization; +using MASES.KafkaBridge.Java.Lang; + +namespace MASES.KafkaBridge.Streams.KStream +{ + public class WindowedSerdes : JVMBridgeBase + { + public override string ClassName => "org.apache.kafka.streams.kstream.WindowedSerdes"; + + + public static Serde> TimeWindowedSerdeFrom(Class type, long windowSize) where T : IJVMBridgeBase, new() + { + return SExecute>>("timeWindowedSerdeFrom", type.JVMType, windowSize); + } + + public static Serde> SessionWindowedSerdeFrom(Class type) where T : IJVMBridgeBase, new() + { + return SExecute>>("sessionWindowedSerdeFrom", type.JVMType); + } + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/KeyValueIterator.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/KeyValueIterator.cs index 511af62cc7..da51185a47 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/KeyValueIterator.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/KeyValueIterator.cs @@ -17,13 +17,16 @@ */ using MASES.JCOBridge.C2JBridge; +using MASES.KafkaBridge.Java.Util; namespace MASES.KafkaBridge.Streams.State { - public class KeyValueIterator : JVMBridgeBaseEnumerable> + public class KeyValueIterator : JVMBridgeBaseEnumerable, KeyValue> { public override string ClassName => "org.apache.kafka.streams.state.KeyValueIterator"; + public static implicit operator Iterator>(KeyValueIterator keyValueIterator) { return Wraps>>(keyValueIterator.Instance); } + public virtual K PeekNextKey => IExecute("peekNextKey"); } } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/QueryableStoreTypes.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/QueryableStoreTypes.cs new file mode 100644 index 0000000000..f64e8c475e --- /dev/null +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/QueryableStoreTypes.cs @@ -0,0 +1,37 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; + +namespace MASES.KafkaBridge.Streams.State +{ + public class QueryableStoreTypes : JVMBridgeBase + { + public override string ClassName => "org.apache.kafka.streams.state.QueryableStoreTypes"; + + public static QueryableStoreType> KeyValueStore() => SExecute>>("keyValueStore"); + + public static QueryableStoreType>> TimestampedKeyValueStore() => SExecute>>>("timestampedKeyValueStore"); + + public static QueryableStoreType> WindowStore() => SExecute>>("windowStore"); + + public static QueryableStoreType>> TimestampedWindowStore() => SExecute>>>("timestampedWindowStore"); + + public static QueryableStoreType> SessionStore() => SExecute>>("sessionStore"); + } +} diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/ReadOnlyKeyValueStore.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/ReadOnlyKeyValueStore.cs index efe1c5b04c..f59891f294 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/ReadOnlyKeyValueStore.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/ReadOnlyKeyValueStore.cs @@ -30,4 +30,23 @@ public interface IReadOnlyKeyValueStore : IJVMBridgeBase long ApproximateNumEntries { get; } } + + public class ReadOnlyKeyValueStore : JVMBridgeBase, IReadOnlyKeyValueStore>, IReadOnlyKeyValueStore + { + public override string ClassName => "org.apache.kafka.streams.state.ReadOnlyKeyValueStore"; + + public KeyValueIterator All => IExecute>("all"); + + public long ApproximateNumEntries => IExecute("approximateNumEntries"); + + public V Get(K key) + { + return IExecute("get", key); + } + + public KeyValueIterator Range(K from, K to) + { + return IExecute>("range", from, to); + } + } } \ No newline at end of file diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/Stores.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/Stores.cs index 40ca944b3f..7a583f0ce1 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/Stores.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/State/Stores.cs @@ -25,24 +25,24 @@ public class Stores : JCOBridge.C2JBridge.JVMBridgeBase { public override string ClassName => "org.apache.kafka.streams.state.Stores"; - public static WindowBytesStoreSupplier PersistentKeyValueStore(string name) + public static KeyValueBytesStoreSupplier PersistentKeyValueStore(string name) { - return SExecute("persistentKeyValueStore", name); + return SExecute("persistentKeyValueStore", name); } - public static WindowBytesStoreSupplier PersistentTimestampedKeyValueStore(string name) + public static KeyValueBytesStoreSupplier PersistentTimestampedKeyValueStore(string name) { - return SExecute("persistentTimestampedKeyValueStore", name); + return SExecute("persistentTimestampedKeyValueStore", name); } - public static WindowBytesStoreSupplier InMemoryKeyValueStore(string name) + public static KeyValueBytesStoreSupplier InMemoryKeyValueStore(string name) { - return SExecute("inMemoryKeyValueStore", name); + return SExecute("inMemoryKeyValueStore", name); } - public static WindowBytesStoreSupplier LruMap(string name, int maxCacheSize) + public static KeyValueBytesStoreSupplier LruMap(string name, int maxCacheSize) { - return SExecute("lruMap", name, maxCacheSize); + return SExecute("lruMap", name, maxCacheSize); } @@ -82,37 +82,37 @@ public static SessionBytesStoreSupplier InMemorySessionStore(string name, Durati } - public static StoreBuilder> KeyValueStoreBuilder(WindowBytesStoreSupplier supplier, - Serde keySerde, - Serde valueSerde) + public static StoreBuilder> KeyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, + Serde keySerde, + Serde valueSerde) { return SExecute>>("keyValueStoreBuilder", supplier, keySerde, valueSerde); } - public static StoreBuilder> TimestampedKeyValueStoreBuilder(WindowBytesStoreSupplier supplier, - Serde keySerde, - Serde valueSerde) + public static StoreBuilder> TimestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier supplier, + Serde keySerde, + Serde valueSerde) { return SExecute>>("timestampedKeyValueStoreBuilder", supplier, keySerde, valueSerde); } public static StoreBuilder> WindowStoreBuilder(WindowBytesStoreSupplier supplier, - Serde keySerde, - Serde valueSerde) + Serde keySerde, + Serde valueSerde) { return SExecute>>("windowStoreBuilder", supplier, keySerde, valueSerde); } public static StoreBuilder> TimestampedWindowStoreBuilder(WindowBytesStoreSupplier supplier, - Serde keySerde, - Serde valueSerde) + Serde keySerde, + Serde valueSerde) { return SExecute>>("timestampedWindowStoreBuilder", supplier, keySerde, valueSerde); } public static StoreBuilder> SessionStoreBuilder(SessionBytesStoreSupplier supplier, - Serde keySerde, - Serde valueSerde) + Serde keySerde, + Serde valueSerde) { return SExecute>>("sessionStoreBuilder", supplier, keySerde, valueSerde); } diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StoreQueryParameters.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StoreQueryParameters.cs index 2fbbec84f6..80068a4f36 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StoreQueryParameters.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StoreQueryParameters.cs @@ -29,9 +29,9 @@ public StoreQueryParameters() { } - public static StoreQueryParameters FromNameAndType(string storeName, QueryableStoreType queryableStoreType) + public static StoreQueryParameters FromNameAndType(string storeName, QueryableStoreType queryableStoreType) { - return SExecute>("fromNameAndType", storeName, queryableStoreType); + return SExecute>("fromNameAndType", storeName, queryableStoreType); } public StoreQueryParameters WithPartition(int partition) diff --git a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StreamsConfig.cs b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StreamsConfig.cs index 6a3e334d53..3dbffc3db3 100644 --- a/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StreamsConfig.cs +++ b/src/net/KafkaBridge/ClientSide/BridgedClasses/Streams/StreamsConfig.cs @@ -16,11 +16,13 @@ * Refer to LICENSE for more information. */ +using MASES.KafkaBridge.Clients; +using MASES.KafkaBridge.Common.Config; using MASES.KafkaBridge.Java.Util; namespace MASES.KafkaBridge.Streams { - public class StreamsConfig : JCOBridge.C2JBridge.JVMBridgeBase + public class StreamsConfig : AbstractConfig { public override string ClassName => "org.apache.kafka.streams.StreamsConfig"; @@ -182,4 +184,262 @@ public StreamsConfig(Map props) { } } + + public class StreamsConfigBuilder : CommonClientConfigsBuilder + { + public string ApplicationId { get { return GetProperty(StreamsConfig.APPLICATION_ID_CONFIG); } set { SetProperty(StreamsConfig.APPLICATION_ID_CONFIG, value); } } + + public StreamsConfigBuilder WithApplicationId(string applicationId) + { + var clone = Clone(); + clone.ApplicationId = applicationId; + return clone; + } + + public int NumStandByReplicas { get { return GetProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); } set { SetProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, value); } } + + public StreamsConfigBuilder WithNumStandByReplicas(int numStandByReplicas) + { + var clone = Clone(); + clone.NumStandByReplicas = numStandByReplicas; + return clone; + } + + public string StateDir { get { return GetProperty(StreamsConfig.STATE_DIR_CONFIG); } set { SetProperty(StreamsConfig.STATE_DIR_CONFIG, value); } } + + public StreamsConfigBuilder WithStateDir(string stateDir) + { + var clone = Clone(); + clone.StateDir = stateDir; + return clone; + } + + public long AcceptableRecoveryLag { get { return GetProperty(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); } set { SetProperty(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, value); } } + + public StreamsConfigBuilder WithAcceptableRecoveryLag(long acceptableRecoveryLag) + { + var clone = Clone(); + clone.AcceptableRecoveryLag = acceptableRecoveryLag; + return clone; + } + + public long CacheMaxBytesBuffering { get { return GetProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); } set { SetProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, value); } } + + public StreamsConfigBuilder WithCacheMaxBytesBuffering(long cacheMaxBytesBuffering) + { + var clone = Clone(); + clone.CacheMaxBytesBuffering = cacheMaxBytesBuffering; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic DefaultDeserializationExceptionHandlerClass { get { return GetProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG); } set { SetProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, value); } } + + [System.Obsolete("To be checked")] + public StreamsConfigBuilder WithDefaultDeserializationExceptionHandlerClass(dynamic defaultDeserializationExceptionHandlerClass) + { + var clone = Clone(); + clone.DefaultDeserializationExceptionHandlerClass = defaultDeserializationExceptionHandlerClass; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic DefaultKeySerdeClass { get { return GetProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG); } set { SetProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, value); } } + + [System.Obsolete("To be checked")] + public StreamsConfigBuilder WithDefaultKeySerdeClass(dynamic defaultKeySerdeClass) + { + var clone = Clone(); + clone.DefaultKeySerdeClass = defaultKeySerdeClass; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic DefaultProductionExceptionHandlerClass { get { return GetProperty(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG); } set { SetProperty(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, value); } } + + [System.Obsolete("To be checked")] + public StreamsConfigBuilder WithDefaultProductionExceptionHandlerClass(dynamic defaultProductionExceptionHandlerClass) + { + var clone = Clone(); + clone.DefaultProductionExceptionHandlerClass = defaultProductionExceptionHandlerClass; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic DefaultTimestampExtractorClass { get { return GetProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG); } set { SetProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, value); } } + + [System.Obsolete("To be checked")] + public StreamsConfigBuilder WithDefaultTimestampExtractorClass(dynamic defaultTimestampExtractorClass) + { + var clone = Clone(); + clone.DefaultTimestampExtractorClass = defaultTimestampExtractorClass; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic DefaultValueSerdeClass { get { return GetProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG); } set { SetProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, value); } } + + [System.Obsolete("To be checked")] + public StreamsConfigBuilder WithDefaultValueSerdeClass(dynamic defaultValueSerdeClass) + { + var clone = Clone(); + clone.DefaultValueSerdeClass = defaultValueSerdeClass; + return clone; + } + + public long MaxTaskIdleMs { get { return GetProperty(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); } set { SetProperty(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, value); } } + + public StreamsConfigBuilder WithMaxTaskIdleMs(long maxTaskIdleMs) + { + var clone = Clone(); + clone.MaxTaskIdleMs = maxTaskIdleMs; + return clone; + } + + public int MaxWarmupReplicas { get { return GetProperty(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); } set { SetProperty(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, value); } } + + public StreamsConfigBuilder WithMaxWarmupReplicas(int maxWarmupReplicas) + { + var clone = Clone(); + clone.MaxWarmupReplicas = maxWarmupReplicas; + return clone; + } + + public int NumStreamThreads { get { return GetProperty(StreamsConfig.NUM_STREAM_THREADS_CONFIG); } set { SetProperty(StreamsConfig.NUM_STREAM_THREADS_CONFIG, value); } } + + public StreamsConfigBuilder WithNumStreamThreads(int numStreamThreads) + { + var clone = Clone(); + clone.NumStreamThreads = numStreamThreads; + return clone; + } + + public int ReplicationFactor { get { return GetProperty(StreamsConfig.REPLICATION_FACTOR_CONFIG); } set { SetProperty(StreamsConfig.REPLICATION_FACTOR_CONFIG, value); } } + + public StreamsConfigBuilder WithReplicationFactor(int replicationFactor) + { + var clone = Clone(); + clone.ReplicationFactor = replicationFactor; + return clone; + } + + public long TaskTimeoutMs { get { return GetProperty(StreamsConfig.TASK_TIMEOUT_MS_CONFIG); } set { SetProperty(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, value); } } + + public StreamsConfigBuilder WithTaskTimeoutMs(long taskTimeoutMs) + { + var clone = Clone(); + clone.TaskTimeoutMs = taskTimeoutMs; + return clone; + } + + public bool TopologyOptimization { get { return GetProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG) == StreamsConfig.OPTIMIZE; } set { SetProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, value ? StreamsConfig.OPTIMIZE : StreamsConfig.NO_OPTIMIZATION); } } + + public StreamsConfigBuilder WithTopologyOptimization(bool topologyOptimization) + { + var clone = Clone(); + clone.TopologyOptimization = topologyOptimization; + return clone; + } + + public string ApplicationServer { get { return GetProperty(StreamsConfig.APPLICATION_SERVER_CONFIG); } set { SetProperty(StreamsConfig.APPLICATION_SERVER_CONFIG, value); } } + + public StreamsConfigBuilder WithApplicationServer(string applicationServer) + { + var clone = Clone(); + clone.ApplicationServer = applicationServer; + return clone; + } + + public int BufferedRecordsPerPartition { get { return GetProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); } set { SetProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, value); } } + + public StreamsConfigBuilder WithBufferedRecordsPerPartition(int bufferedRecordsPerPartition) + { + var clone = Clone(); + clone.BufferedRecordsPerPartition = bufferedRecordsPerPartition; + return clone; + } + + public string BuiltInMetricsVersion { get { return GetProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG); } set { SetProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, value); } } + + public StreamsConfigBuilder WithBuiltInMetricsVersion(string builtInMetricsVersion) + { + var clone = Clone(); + clone.BuiltInMetricsVersion = builtInMetricsVersion; + return clone; + } + + public long CommitIntervalMs { get { return GetProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); } set { SetProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, value); } } + + public StreamsConfigBuilder WithCommitIntervalMs(long commitIntervalMs) + { + var clone = Clone(); + clone.CommitIntervalMs = commitIntervalMs; + return clone; + } + + public long ProbingRebalanceIntervalMs { get { return GetProperty(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); } set { SetProperty(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, value); } } + + public StreamsConfigBuilder WithProbingRebalanceIntervalMs(long probingRebalanceIntervalMs) + { + var clone = Clone(); + clone.ProbingRebalanceIntervalMs = probingRebalanceIntervalMs; + return clone; + } + + [System.Obsolete("To be checked")] + public dynamic RocksDbConfigSetterClass { get { return GetProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG); } set { SetProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, value); } } + + [System.Obsolete("To be checked")] + public StreamsConfigBuilder WithRocksDbConfigSetterClass(dynamic rocksDbConfigSetterClass) + { + var clone = Clone(); + clone.RocksDbConfigSetterClass = rocksDbConfigSetterClass; + return clone; + } + + public long StateCleanupDelayMs { get { return GetProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); } set { SetProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, value); } } + + public StreamsConfigBuilder WithStateCleanupDelayMs(long stateCleanupDelayMs) + { + var clone = Clone(); + clone.StateCleanupDelayMs = stateCleanupDelayMs; + return clone; + } + + public string UpgradeFrom { get { return GetProperty(StreamsConfig.UPGRADE_FROM_CONFIG); } set { SetProperty(StreamsConfig.UPGRADE_FROM_CONFIG, value); } } + + public StreamsConfigBuilder WithUpgradeFrom(string upgradeFrom) + { + var clone = Clone(); + clone.UpgradeFrom = upgradeFrom; + return clone; + } + + public string WindowedInnerClassSerde { get { return GetProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE); } set { SetProperty(StreamsConfig.WINDOWED_INNER_CLASS_SERDE, value); } } + + public StreamsConfigBuilder WithWindowedInnerClassSerde(string windowedInnerClassSerde) + { + var clone = Clone(); + clone.WindowedInnerClassSerde = windowedInnerClassSerde; + return clone; + } + + public long WindowStoreChangeLogAdditionalRetentionMs{ get { return GetProperty(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG); } set { SetProperty(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, value); } } + + public StreamsConfigBuilder WithWindowStoreChangeLogAdditionalRetentionMs(long windowStoreChangeLogAdditionalRetentionMs) + { + var clone = Clone(); + clone.WindowStoreChangeLogAdditionalRetentionMs = windowStoreChangeLogAdditionalRetentionMs; + return clone; + } + + public long WindowSizeMs { get { return GetProperty(StreamsConfig.WINDOW_SIZE_MS_CONFIG); } set { SetProperty(StreamsConfig.WINDOW_SIZE_MS_CONFIG, value); } } + + public StreamsConfigBuilder WithWindowSizeMs(long windowSizeMs) + { + var clone = Clone(); + clone.WindowSizeMs = windowSizeMs; + return clone; + } + } } diff --git a/src/net/KafkaBridge/GenericConfigBuilder.cs b/src/net/KafkaBridge/GenericConfigBuilder.cs new file mode 100644 index 0000000000..b888e8c3c3 --- /dev/null +++ b/src/net/KafkaBridge/GenericConfigBuilder.cs @@ -0,0 +1,112 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KafkaBridge.Java.Util; + +namespace MASES.KafkaBridge +{ + /// + /// Generic base configuration class + /// + /// + public class GenericConfigBuilder : System.ComponentModel.INotifyPropertyChanged + where T : GenericConfigBuilder, new() + { + /// + /// Creates an instance of + /// + /// The instance of + public static T Create() { return new T(); } + + public static implicit operator Properties(GenericConfigBuilder clazz) { return clazz.ToProperties(); } + + /// + public event System.ComponentModel.PropertyChangedEventHandler PropertyChanged; + + System.Collections.Generic.Dictionary _options = new(); + /// + /// Reads the set + /// + /// The propert type + /// The property name to be get + /// or if property does not exists + public TData GetProperty(string propertyName) + { + if (_options.TryGetValue(propertyName, out var result)) + { + return (TData)result; + } + return default; + } + /// + /// Set the with + /// + /// The property name to be set + /// The property value to be set + public void SetProperty(string propertyName, object value) + { + if (!_options.ContainsKey(propertyName)) + { + _options.Add(propertyName, value); + } + else _options[propertyName] = value; + + PropertyChanged?.Invoke(this, new System.ComponentModel.PropertyChangedEventArgs(propertyName)); + } + /// + /// Cones this instance + /// + /// cloned + protected virtual T Clone() + { + var clone = new T + { + _options = new System.Collections.Generic.Dictionary(_options) + }; return clone; + } + /// + /// Returns the from the instance + /// + /// containing the properties + public Properties ToProperties() + { + Properties props = new(); + foreach (var item in _options) + { + props.Put(item.Key, item.Value); + } + + return props; + } + + /// + /// Returns the from the instance + /// + /// containing the properties + public Map ToMap() + { + HashMap props = new(); + foreach (var item in _options) + { + props.Put(item.Key, item.Value.ToString()); + } + + return props; + } + } +} diff --git a/src/net/KafkaBridge/KafkaBridge.csproj b/src/net/KafkaBridge/KafkaBridge.csproj index 3a50d343ef..9384ceb1f5 100644 --- a/src/net/KafkaBridge/KafkaBridge.csproj +++ b/src/net/KafkaBridge/KafkaBridge.csproj @@ -9,7 +9,7 @@ MASES s.r.l. MASES s.r.l. MASES s.r.l. - 1.1.7.0 + 1.1.8.0 KafkaBridge true net461;netcoreapp3.1;net5.0;net6.0 @@ -61,4 +61,7 @@ + + + diff --git a/src/net/KafkaBridge/KafkaBridgeCore.cs b/src/net/KafkaBridge/KafkaBridgeCore.cs index 5816d2e872..680d37f898 100644 --- a/src/net/KafkaBridge/KafkaBridgeCore.cs +++ b/src/net/KafkaBridge/KafkaBridgeCore.cs @@ -599,7 +599,11 @@ string buildClassPath() if (_logClassPath) { - Console.WriteLine("ClassPath is: {0}", classPath); + Console.WriteLine("ClassPath is defined from:"); + foreach (var item in classPath.Split(InternalConst.PathSeparator)) + { + Console.WriteLine(item); + } } return classPath; } diff --git a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj index 87d495aac5..c21f22d4eb 100644 --- a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj +++ b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.csproj @@ -10,7 +10,7 @@ MASES s.r.l. MASES s.r.l. MASES s.r.l. - 1.1.7.0 + 1.1.8.0 KafkaBridgeCLI true net461;netcoreapp3.1;net5.0;net6.0 diff --git a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec index 034053367f..7d86920824 100644 --- a/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec +++ b/src/net/KafkaBridgeCLI/KafkaBridgeCLI.nuspec @@ -2,7 +2,7 @@ MASES.KafkaBridgeCLI - 1.1.7 + 1.1.8 KafkaBridgeCLI - CLI interface of KafkaBridge MASES s.r.l. MASES s.r.l. diff --git a/src/net/templates/templatepack.csproj b/src/net/templates/templatepack.csproj index 4b87430841..9f9ae9b5f5 100644 --- a/src/net/templates/templatepack.csproj +++ b/src/net/templates/templatepack.csproj @@ -1,7 +1,7 @@ Template - 1.1.7.0 + 1.1.8.0 MASES.KafkaBridge.Templates KafkaBridge Templates - Templates to use the KafkaBridge MASES s.r.l. diff --git a/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj b/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj index efb576a123..b4438ce7f4 100644 --- a/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj +++ b/src/net/templates/templates/kafkabridgeConsumerApp/kafkabridgeConsumerApp.csproj @@ -12,6 +12,6 @@ - + diff --git a/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj b/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj index efb576a123..b4438ce7f4 100644 --- a/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj +++ b/src/net/templates/templates/kafkabridgePipeStreamApp/kafkabridgePipeStreamApp.csproj @@ -12,6 +12,6 @@ - + diff --git a/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj b/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj index efb576a123..b4438ce7f4 100644 --- a/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj +++ b/src/net/templates/templates/kafkabridgeProducerApp/kafkabridgeProducerApp.csproj @@ -12,6 +12,6 @@ - + diff --git a/tests/KafkaBridgeTest/KafkaBridgeTest.csproj b/tests/KafkaBridgeTest/KafkaBridgeTest.csproj index 905909424e..8aa38dde72 100644 --- a/tests/KafkaBridgeTest/KafkaBridgeTest.csproj +++ b/tests/KafkaBridgeTest/KafkaBridgeTest.csproj @@ -8,9 +8,10 @@ Copyright © MASES s.r.l. 2022 MASES s.r.l. MASES s.r.l. - 1.1.6.0 - net461;netcoreapp3.1;net5.0;net5.0-windows;net6.0;net6.0-windows + 1.1.8.0 + net461;netcoreapp3.1;net5.0;net6.0 ..\..\bin\ + latest diff --git a/tests/KafkaBridgeTest/Program.cs b/tests/KafkaBridgeTest/Program.cs index 7b76afcf50..1589059078 100644 --- a/tests/KafkaBridgeTest/Program.cs +++ b/tests/KafkaBridgeTest/Program.cs @@ -56,15 +56,15 @@ static void Main(string[] args) serverToUse = args[0]; } - createTopic(); + CreateTopic(); - Thread threadProduce = new Thread(produceSomething) + Thread threadProduce = new Thread(ProduceSomething) { Name = "produce" }; threadProduce.Start(); - Thread threadConsume = new Thread(consumeSomething) + Thread threadConsume = new Thread(ConsumeSomething) { Name = "consume" }; @@ -75,22 +75,30 @@ static void Main(string[] args) Thread.Sleep(2000); } - static void createTopic() + static void CreateTopic() { try { string topicName = topicToUse; int partitions = 1; short replicationFactor = 1; - var topicConfig = TopicConfig.DynClazz; var topic = new NewTopic(topicName, partitions, replicationFactor); - var map = Collections.SingletonMap((string)topicConfig.CLEANUP_POLICY_CONFIG, (string)topicConfig.CLEANUP_POLICY_COMPACT); + + /**** Direct mode ****** + var map = Collections.SingletonMap(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); topic.Configs(map); + *********/ + topic.Configs(TopicConfigBuilder.Create().WithCleanupPolicy(TopicConfig.CleanupPolicy.Compact)); + var coll = Collections.Singleton(topic); + /**** Direct mode ****** Properties props = new Properties(); props.Put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse); + *******/ + + Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(serverToUse).ToProperties(); using (var admin = KafkaAdminClient.Create(props)) { @@ -115,10 +123,11 @@ static void createTopic() } } - static void produceSomething() + static void ProduceSomething() { try { + /**** Direct mode ****** Properties props = new Properties(); props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse); props.Put(ProducerConfig.ACKS_CONFIG, "all"); @@ -126,6 +135,16 @@ static void produceSomething() props.Put(ProducerConfig.LINGER_MS_CONFIG, 1); props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + ******/ + + Properties props = ProducerConfigBuilder.Create() + .WithBootstrapServers(serverToUse) + .WithAcks(ProducerConfig.Acks.All) + .WithRetries(0) + .WithLingerMs(1) + .WithKeySerializerClass("org.apache.kafka.common.serialization.StringSerializer") + .WithValueSerializerClass("org.apache.kafka.common.serialization.StringSerializer") + .ToProperties(); Serializer keySerializer = null; Serializer valueSerializer = null; @@ -189,10 +208,11 @@ static void produceSomething() } } - static void consumeSomething() + static void ConsumeSomething() { try { + /**** Direct mode ****** Properties props = new Properties(); props.Put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse); props.Put(ConsumerConfig.GROUP_ID_CONFIG, "test"); @@ -200,6 +220,16 @@ static void consumeSomething() props.Put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.Put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.Put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + *******/ + + Properties props = ConsumerConfigBuilder.Create() + .WithBootstrapServers(serverToUse) + .WithGroupId("test") + .WithEnableAutoCommit(true) + .WithAutoCommitIntervalMs(1000) + .WithKeyDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer") + .WithValueDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer") + .ToProperties(); Deserializer keyDeserializer = null; Deserializer valueDeserializer = null; diff --git a/tests/KafkaBridgeTestAdmin/KafkaBridgeTestAdmin.csproj b/tests/KafkaBridgeTestAdmin/KafkaBridgeTestAdmin.csproj index 430fe6d76d..02d93b00aa 100644 --- a/tests/KafkaBridgeTestAdmin/KafkaBridgeTestAdmin.csproj +++ b/tests/KafkaBridgeTestAdmin/KafkaBridgeTestAdmin.csproj @@ -8,9 +8,10 @@ Copyright © MASES s.r.l. 2022 MASES s.r.l. MASES s.r.l. - 1.1.6.0 + 1.1.8.0 net461;netcoreapp3.1;net5.0;net6.0 ..\..\bin\ + latest diff --git a/tests/KafkaBridgeTestStreams/KafkaBridgeTestStreams.csproj b/tests/KafkaBridgeTestStreams/KafkaBridgeTestStreams.csproj index 9dca33b3dd..3a6867d9c6 100644 --- a/tests/KafkaBridgeTestStreams/KafkaBridgeTestStreams.csproj +++ b/tests/KafkaBridgeTestStreams/KafkaBridgeTestStreams.csproj @@ -8,9 +8,10 @@ Copyright © MASES s.r.l. 2022 MASES s.r.l. MASES s.r.l. - 1.1.6.0 - net461;netcoreapp3.1;net5.0;net5.0-windows;net6.0;net6.0-windows + 1.1.8.0 + net461;netcoreapp3.1;net5.0;net6.0 ..\..\bin\ + latest