Skip to content

Commit

Permalink
Upgrade to JCOBridge 2.4.0 and added some improvements (#9)
Browse files Browse the repository at this point in the history
* #2: migration to JCOBridge 2.4.1

* Added missing template and fix

* #4: added new API

* #6: better implementation of some classes
  • Loading branch information
masesdevelopers authored Nov 21, 2021
1 parent b3a5cef commit f45868f
Show file tree
Hide file tree
Showing 36 changed files with 562 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public CreateTopicsOptions validateOnly(bool validateOnly)

public bool shouldValidateOnly()
{
return (bool)IExecute("shouldValidateOnly");
return IExecute<bool>("shouldValidateOnly");
}

public CreateTopicsOptions retryOnQuotaViolation(bool validateOnly)
Expand All @@ -41,7 +41,7 @@ public CreateTopicsOptions retryOnQuotaViolation(bool validateOnly)

public bool shouldRetryOnQuotaViolation()
{
return (bool)IExecute("shouldRetryOnQuotaViolation");
return IExecute<bool>("shouldRetryOnQuotaViolation");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public DeleteTopicsOptions retryOnQuotaViolation(bool validateOnly)

public bool shouldRetryOnQuotaViolation()
{
return (bool)IExecute("shouldRetryOnQuotaViolation");
return IExecute<bool>("shouldRetryOnQuotaViolation");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
/*
* Copyright 2021 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

namespace MASES.KafkaBridge.BridgedClasses.Clients.Admin
namespace MASES.KafkaBridge.Clients.Admin
{
class DescribeClusterResult
public class DescribeClusterResult : JCOBridge.C2JBridge.JVMBridgeBase<DescribeClusterResult>
{
public override string ClassName => "org.apache.kafka.clients.admin.DescribeClusterResult";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2021 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

namespace MASES.KafkaBridge.Clients.Admin
{
public class DescribeConsumerGroupsResult : JCOBridge.C2JBridge.JVMBridgeBase<DescribeConsumerGroupsResult>
{
public override string ClassName => "org.apache.kafka.clients.admin.DescribeConsumerGroupsResult";
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
/*
* Copyright 2021 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

namespace MASES.KafkaBridge.BridgedClasses.Clients.Admin
namespace MASES.KafkaBridge.Clients.Admin
{
class DescribeTopicsResult
public class DescribeTopicsResult : JCOBridge.C2JBridge.JVMBridgeBase<DescribeTopicsResult>
{
public override string ClassName => "org.apache.kafka.clients.admin.DescribeTopicsResult";
}
}
25 changes: 25 additions & 0 deletions src/KafkaBridge/BridgedClasses/Clients/Admin/ElectLeadersResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2021 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

namespace MASES.KafkaBridge.Clients.Admin
{
public class ElectLeadersResult : JCOBridge.C2JBridge.JVMBridgeBase<ElectLeadersResult>
{
public override string ClassName => "org.apache.kafka.clients.admin.ElectLeadersResult";
}
}
22 changes: 19 additions & 3 deletions src/KafkaBridge/BridgedClasses/Clients/Admin/KafkaAdminClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Refer to LICENSE for more information.
*/

using MASES.KafkaBridge.Common;
using MASES.KafkaBridge.Java.Util;

namespace MASES.KafkaBridge.Clients.Admin
Expand All @@ -31,17 +32,32 @@ public static KafkaAdminClient Create(Properties props)

public CreateTopicsResult CreateTopics(Collection<NewTopic> newTopics)
{
return IExecute<CreateTopicsResult>("createTopics", newTopics.Instance);
return New<CreateTopicsResult>("createTopics", newTopics.Instance);
}

public CreateTopicsResult CreateTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options)
{
return IExecute<CreateTopicsResult>("createTopics", newTopics.Instance, options.Instance);
return New<CreateTopicsResult>("createTopics", newTopics.Instance, options.Instance);
}

public CreateTopicsResult DeleteTopics(Collection<NewTopic> newTopics)
{
return IExecute<CreateTopicsResult>("deleteTopics", newTopics.Instance);
return New<CreateTopicsResult>("deleteTopics", newTopics.Instance);
}

public DescribeConsumerGroupsResult DescribeConsumerGroups(Collection<string> groupIds)
{
return New<DescribeConsumerGroupsResult>("describeConsumerGroups", groupIds.Instance);
}

public ListConsumerGroupsResult ListConsumerGroups()
{
return New<ListConsumerGroupsResult>("listConsumerGroups");
}

public ElectLeadersResult ElectLeaders(ElectionType electionType, Set<TopicPartition> partitions)
{
return New<ElectLeadersResult>("electLeaders", (byte)electionType, partitions.Instance);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2021 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

namespace MASES.KafkaBridge.Clients.Admin
{
public class ListConsumerGroupsResult : JCOBridge.C2JBridge.JVMBridgeBase<ListConsumerGroupsResult>
{
public override string ClassName => "org.apache.kafka.clients.admin.ListConsumerGroupsResult";
}
}
27 changes: 20 additions & 7 deletions src/KafkaBridge/BridgedClasses/Clients/Admin/ListTopicsResult.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
/*
* Copyright 2021 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

namespace MASES.KafkaBridge.BridgedClasses.Clients.Admin
namespace MASES.KafkaBridge.Clients.Admin
{
class ListTopicsResult
public class ListTopicsResult : JCOBridge.C2JBridge.JVMBridgeBase<ListTopicsResult>
{
public override string ClassName => "org.apache.kafka.clients.admin.ListTopicsResult";
}
}
29 changes: 17 additions & 12 deletions src/KafkaBridge/BridgedClasses/Clients/Consumer/ConsumerRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,40 @@
* Refer to LICENSE for more information.
*/

using MASES.KafkaBridge.Common.Header;
using System;

namespace MASES.KafkaBridge.Clients.Consumer
{
public class ConsumerRecord : JCOBridge.C2JBridge.JVMBridgeBase<ConsumerRecord>
public class ConsumerRecord<K, V> : JCOBridge.C2JBridge.JVMBridgeBase<ConsumerRecord<K, V>>
{
public override string ClassName => "org.apache.kafka.clients.consumer.ConsumerRecord";

public string Topic => (string)IExecute("topic");
public string Topic => IExecute<string>("topic");

public int Partition => IExecute<int>("partition");

public int Partition => (int)IExecute("partition");
public Headers Headers => New<Headers>("headers");

public object Headers => IExecute("headers");
public K Key => IExecute<K>("key");

public object Key => IExecute("key");
public V Value => IExecute<V>("value");

public object Value => IExecute("value");
public long Offset => IExecute<long>("offset");

public long Offset => (long)IExecute("offset");
public DateTime DateTime => DateTimeOffset.FromUnixTimeMilliseconds(Timestamp).DateTime;

public long Timestamp => (long)IExecute("timestamp");
public long Timestamp => IExecute<long>("timestamp");

public object TimestampType => IExecute("timestampType");

public int SerializedKeySize => (int)IExecute("serializedKeySize");
public int SerializedKeySize => IExecute<int>("serializedKeySize");

public int SerializedValueSize => (int)IExecute("serializedValueSize");
public int SerializedValueSize => IExecute<int>("serializedValueSize");
}

public class ConsumerRecord<K, V> : JCOBridge.C2JBridge.JVMBridgeBase<ConsumerRecord<K, V>>
public class ConsumerRecord : ConsumerRecord<object, object>
{
public override string ClassName => "org.apache.kafka.clients.consumer.ConsumerRecord";

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

namespace MASES.KafkaBridge.Clients.Consumer
{
public class ConsumerRecords : JCOBridge.C2JBridge.JVMBridgeBaseEnumerable<ConsumerRecords, ConsumerRecord>
public class ConsumerRecords<K, V> : JCOBridge.C2JBridge.JVMBridgeBaseEnumerable<ConsumerRecords<K, V>, ConsumerRecord<K, V>>
{
public override string ClassName => "org.apache.kafka.clients.consumer.ConsumerRecords";
}

public class ConsumerRecords<K, V> : JCOBridge.C2JBridge.JVMBridgeBase<ConsumerRecords<K, V>>
public class ConsumerRecords : ConsumerRecords<object, object>
{
public override string ClassName => "org.apache.kafka.clients.consumer.ConsumerRecords";

}
}
10 changes: 5 additions & 5 deletions src/KafkaBridge/BridgedClasses/Clients/Consumer/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

namespace MASES.KafkaBridge.Clients.Consumer
{
public class KafkaConsumer : JCOBridge.C2JBridge.JVMBridgeBase<KafkaConsumer>
public class KafkaConsumer<K, V> : JCOBridge.C2JBridge.JVMBridgeBase<KafkaConsumer<K, V>>
{
public override string ClassName => "org.apache.kafka.clients.consumer.KafkaConsumer";

Expand Down Expand Up @@ -49,14 +49,14 @@ public void Assign(Collection<TopicPartition> partitions)
IExecute("assign", partitions.Instance);
}

public ConsumerRecords Poll(long timeoutMs)
public ConsumerRecords<K, V> Poll(long timeoutMs)
{
return IExecute<ConsumerRecords>("poll", timeoutMs);
return New<ConsumerRecords<K, V>>("poll", timeoutMs);
}
}

public class KafkaConsumer<K, V> : JCOBridge.C2JBridge.JVMBridgeBase<KafkaConsumer<K, V>>
public class KafkaConsumer : KafkaConsumer<object, object>
{
public override string ClassName => "org.apache.kafka.clients.consumer.KafkaConsumer";

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public void AbortTransaction()

public Future<RecordMetadata> Send(ProducerRecord record)
{
return IExecute<Future<RecordMetadata>>("send", record.Instance);
return New<Future<RecordMetadata>>("send", record.Instance);
}

public Future<RecordMetadata> Send<K, V>(ProducerRecord<K, V> record)
{
return IExecute<Future<RecordMetadata>>("send", record.Instance);
return New<Future<RecordMetadata>>("send", record.Instance);
}

public void Flush()
Expand Down
19 changes: 4 additions & 15 deletions src/KafkaBridge/BridgedClasses/Clients/Producer/ProducerRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,13 @@ public ProducerRecord(string topic, V value)
{
}

public string Topic => (string)IExecute("topic");
public string Topic => IExecute<string>("topic");

public K Key => (K)IExecute("key");
public K Key => IExecute<K>("key");

public V Value => (V)IExecute("value");
public V Value => IExecute<V>("value");

public long Timestamp => (long)IExecute("timestamp");

/// <inheritdoc cref="object.ToString"/>
public override string ToString()
{
if (Instance != null)
{
return (string) (IExecute("toString") as MASES.JCOBridge.C2JBridge.JVMInterop.IJavaObject).ToPrimitive();
}

return base.ToString();
}
public long Timestamp => IExecute<long>("timestamp");
}

public class ProducerRecord : ProducerRecord<object, object>
Expand Down
Loading

0 comments on commit f45868f

Please sign in to comment.