Skip to content

Commit

Permalink
#19: added CLI API and many classes of Apache Kafka Connect (#62)
Browse files Browse the repository at this point in the history
* #19: added CLI API and many classes of Apache Kafka Connect

* Added CLI info on start-up (from scripts in bin folder)

* #19: Added MirrorMaker to cover CLI interface before complete other connect classes

* KNetCLI reports the ClassToRun in lexical order; fix new MirrorMaker name
  • Loading branch information
masesdevelopers authored May 10, 2022
1 parent 5764c14 commit 1434a92
Show file tree
Hide file tree
Showing 70 changed files with 2,565 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/net/Documentation/articles/usageCLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ _knet_ accepts the following command-line switch:
* TransactionsCommand
* VerifiableConsumer
* VerifiableProducer
* Connect scope:
* ConnectDistributed
* ConnectStandalone
* MirrorMaker2
* **KafkaLocation**: represents the path to the root folder of Apache Kafka binary distribution; default value consider that KNetCLI uses the Apache Kafka jars available under the jars folder prepared from the package;
* **ScalaVersion**: the scala version to be used. The default version (_2.13.6_) is binded to the deafult Apache Kafka version available in the package;
* **Log4JConfiguration**: the log4j configuration file; the default uses the file within the package.
38 changes: 38 additions & 0 deletions src/net/KNet/ClientSide/BridgedClasses/Common/Config/Config.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2022 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using Java.Util;
using MASES.JCOBridge.C2JBridge;

namespace MASES.KNet.Common.Config
{
public class Config : JVMBridgeBase<Config>
{
public override string ClassName => "org.apache.kafka.common.config.Config";

public Config()
{
}

public Config(List<ConfigValue> configValues): base(configValues)
{
}

public List<ConfigValue> ConfigValues => IExecute<List<ConfigValue>>("configValues");
}
}
40 changes: 40 additions & 0 deletions src/net/KNet/ClientSide/BridgedClasses/Common/Config/ConfigDef.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2022 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.JCOBridge.C2JBridge;

namespace MASES.KNet.Common.Config
{
public class ConfigDef : JVMBridgeBase<ConfigDef>
{
public override string ClassName => "org.apache.kafka.common.config.ConfigDef";

public ConfigDef()
{
}

public ConfigDef(ConfigDef baseDef) : base(baseDef)
{
}

private static readonly object v = SExecute("NO_DEFAULT_VALUE");
public static object NO_DEFAULT_VALUE = v;

// TO BE COMPLETED
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2022 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using Java.Util;
using MASES.JCOBridge.C2JBridge;

namespace MASES.KNet.Common.Config
{
public class ConfigValue : JVMBridgeBase<ConfigValue>
{
public override string ClassName => "org.apache.kafka.common.config.ConfigValue";

[System.Obsolete("This is not public in Apache Kafka API")]
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public ConfigValue()
{
}

public ConfigValue(string name)
:base(name)
{
}

public ConfigValue(string name, object value, List<object> recommendedValues, List<string> errorMessages)
:base(name, value, recommendedValues, errorMessages)
{
}

public string Name => IExecute<string>("name");

public object Value
{
get { return IExecute("value"); }
set { IExecute("value", value); }
}

public List<object> RecommendedValues
{
get { return IExecute<List<object>>("recommendedValues"); }
set { IExecute("recommendedValues", value); }
}

public List<string> ErrorMessages => IExecute<List<string>>("errorMessages");

public bool Visible
{
get { return IExecute<bool>("visible"); }
set { IExecute("visible", value); }
}

public void AddErrorMessage(string errorMessage) => IExecute("addErrorMessage", errorMessage);
}
}
36 changes: 36 additions & 0 deletions src/net/KNet/ClientSide/BridgedClasses/Common/Configurable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using Java.Util;
using MASES.JCOBridge.C2JBridge;

namespace MASES.KNet.Common
{
public interface IConfigurable : IJVMBridgeBase
{
void Configure(Map<string, object> configs);
}

public class Configurable : JVMBridgeBase<Configurable, IConfigurable>, IConfigurable
{
public override bool IsInterface => true;
public override string ClassName => "org.apache.kafka.common.Configurable";

public void Configure(Map<string, object> configs) => IExecute("configure", configs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2022 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

namespace MASES.KNet.Connect.Cli
{
/// <summary>
/// Class managing ConnectDistributed
/// </summary>
public class ConnectDistributed : JCOBridge.C2JBridge.JVMBridgeMain<ConnectDistributed>
{
/// <summary>
/// Initialize a new <see cref="ConnectDistributed"/>
/// </summary>
public ConnectDistributed()
: base("org.apache.kafka.connect.cli.ConnectDistributed")
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2022 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

namespace MASES.KNet.Connect.Cli
{
/// <summary>
/// Class managing ConnectStandalone
/// </summary>
public class ConnectStandalone : JCOBridge.C2JBridge.JVMBridgeMain<ConnectStandalone>
{
/// <summary>
/// Initialize a new <see cref="ConnectStandalone"/>
/// </summary>
public ConnectStandalone()
: base("org.apache.kafka.connect.cli.ConnectStandalone")
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.JCOBridge.C2JBridge;

namespace MASES.KNet.Connect.Components
{
public class Versioned : JVMBridgeBase<Versioned>
{
public override bool IsInterface => true;
public override string ClassName => "org.apache.kafka.connect.components.Versioned";

public string Version => IExecute<string>("version");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2022 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using Java.Lang;
using MASES.JCOBridge.C2JBridge;
using MASES.KNet.Connect.Data;
using MASES.KNet.Connect.Header;

namespace MASES.KNet.Connect.Connector
{
public class ConnectRecord<R> : JVMBridgeBase<ConnectRecord<R>> where R : ConnectRecord<R>
{
public override bool IsAbstract => true;
public override string ClassName => "org.apache.kafka.connect.connector.ConnectRecord";

[System.Obsolete("This is not public in Apache Kafka API")]
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public ConnectRecord()
{

}

public ConnectRecord(string topic, int kafkaPartition,
Schema keySchema, object key,
Schema valueSchema, object value,
long timestamp)
: base(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp)
{
}

public ConnectRecord(string topic, int kafkaPartition,
Schema keySchema, object key,
Schema valueSchema, object value,
long timestamp, Iterable<Header.Header> headers)
: base(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers)
{
}

public ConnectRecord(params object[] args) : base(args)
{
}

public string Topic => IExecute<string>("topic");

public int KafkaPartition => IExecute<int>("kafkaPartition");

public object Key => IExecute("key");

public Schema KeySchema => IExecute<Schema>("keySchema");

public object Value => IExecute("value");

public Schema ValueSchema => IExecute<Schema>("valueSchema");

public long Timestamp => IExecute<long>("timestamp");

public Headers Headers => IExecute<Headers>("headers");

public R NewRecord(string topic, int kafkaPartition, Schema keySchema, object key, Schema valueSchema, object value, long timestamp) => IExecute<R>("newRecord", topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp);

public R NewRecord(string topic, int kafkaPartition, Schema keySchema, object key, Schema valueSchema, object value, long timestamp, Iterable<Header.Header> headers) => IExecute<R>("newRecord", topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers);
}
}
Loading

0 comments on commit 1434a92

Please sign in to comment.