Skip to content

Commit

Permalink
Enhancement on Stream processing (#18)
Browse files Browse the repository at this point in the history
* Upgrade version

* #7: fix on Streams event

* Added extra constructor in consumer/producer

* Update test for latest changes

* #6: fixed serdes, update template and test program

* #4: improved KafkaAdminClient

* #6: improved Producer/Consumer

* #7: initial set of almost Streams classes and interfaces, with associated tests

* #2: upgrade to JCOBridge 2.4.6
  • Loading branch information
masesdevelopers authored Feb 7, 2022
1 parent 00e4ea4 commit 89e8a56
Show file tree
Hide file tree
Showing 223 changed files with 12,415 additions and 330 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase

- name: Install local file to be used within Javadoc plugin of generated POM
run: mvn install:install-file -DgroupId=JCOBridge -DartifactId=JCOBridge -Dversion=2.4.5 -Dpackaging=jar -Dfile=./bin/net5.0/JCOBridge.jar
run: mvn install:install-file -DgroupId=JCOBridge -DartifactId=JCOBridge -Dversion=2.4.6 -Dpackaging=jar -Dfile=./bin/net5.0/JCOBridge.jar
shell: bash

- name: Create Jars
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:

- name: Maven preparation (step 2)
if: matrix.language == 'java'
run: mvn "install:install-file" "-DgroupId=JCOBridge" "-DartifactId=JCOBridge" "-Dversion=2.4.5" "-Dpackaging=jar" "-Dfile=./bin/net5.0/JCOBridge.jar"
run: mvn "install:install-file" "-DgroupId=JCOBridge" "-DartifactId=JCOBridge" "-Dversion=2.4.6" "-Dpackaging=jar" "-Dfile=./bin/net5.0/JCOBridge.jar"

- if: matrix.language == 'java'
run: mvn --file ./src/java/kafkabridge/pom.xml --no-transfer-progress package
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pullrequest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase

- name: Install local file to be used within Javadoc plugin of generated POM
run: mvn install:install-file -DgroupId=JCOBridge -DartifactId=JCOBridge -Dversion=2.4.5 -Dpackaging=jar -Dfile=./bin/net5.0/JCOBridge.jar
run: mvn install:install-file -DgroupId=JCOBridge -DartifactId=JCOBridge -Dversion=2.4.6 -Dpackaging=jar -Dfile=./bin/net5.0/JCOBridge.jar
shell: bash

- name: Create Jars
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase

- name: Install local file to be used within Javadoc plugin of generated POM
run: mvn install:install-file -DgroupId=JCOBridge -DartifactId=JCOBridge -Dversion=2.4.5 -Dpackaging=jar -Dfile=./bin/net5.0/JCOBridge.jar
run: mvn install:install-file -DgroupId=JCOBridge -DartifactId=JCOBridge -Dversion=2.4.6 -Dpackaging=jar -Dfile=./bin/net5.0/JCOBridge.jar
shell: bash

- name: Create Jars
Expand Down
27 changes: 27 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Compiled source #
###################
*.class

# Intermediate files #
###################
*.classpath
*.filelist
*.manifest
*.csproj.user

# Support files #
###################
launchSettings.json

###############
# folder #
###############
/**/.vs/
/**/.idea/
/**/target/
/**/DROP/
/**/TEMP/
/**/packages/
/**/bin/
/**/obj/
_site
1 change: 1 addition & 0 deletions src/java/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
###############
# folder #
###############
/**/.idea/
/**/target/
/**/DROP/
/**/TEMP/
Expand Down
4 changes: 2 additions & 2 deletions src/java/kafkabridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<name>mases.kafkabridge</name>
<description>Apache Kafka interface bridging implementation</description>
<url>https://github.com/masesgroup/KafkaBridge</url>
<version>1.1.4.0</version>
<version>1.1.5.0</version>

<licenses>
<license>
Expand Down Expand Up @@ -175,7 +175,7 @@
<additionalDependency>
<groupId>JCOBridge</groupId>
<artifactId>JCOBridge</artifactId>
<version>2.4.5</version>
<version>2.4.6</version>
</additionalDependency>
</additionalDependencies>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* MIT License
*
* Copyright (c) 2022 MASES s.r.l.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.mases.kafkabridge.clients.common.serialization;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.mases.jcobridge.*;

import java.util.Map;

public final class DeserializerImpl extends JCListener implements Deserializer {
public DeserializerImpl(String key) throws JCNativeException {
super(key);
}

public Object deserialize(String topic, byte[] data) {
raiseEvent("deserialize", topic, null, data);
Object retVal = getReturnData();
return retVal;
}

public Object deserialize(String topic, Headers headers, byte[] data) {
raiseEvent("deserializeWithHeaders", topic, headers, data);
Object retVal = getReturnData();
return retVal;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* MIT License
*
* Copyright (c) 2022 MASES s.r.l.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.mases.kafkabridge.clients.common.serialization;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.mases.jcobridge.*;

public final class SerializerImpl extends JCListener implements Serializer {
public SerializerImpl(String key) throws JCNativeException {
super(key);
}

public byte[] serialize(String topic, Object data) {
raiseEvent("serialize", topic, data);
Object retVal = getReturnData();
return (byte[])retVal;
}

public byte[] serialize(String topic, Headers headers, Object data) {
raiseEvent("serializeWithHeaders", topic, headers, data);
Object retVal = getReturnData();
return (byte[])retVal;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* MIT License
*
* Copyright (c) 2022 MASES s.r.l.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.mases.kafkabridge.streams;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.mases.jcobridge.*;

import java.util.Map;

public final class KafkaClientSupplierImpl extends JCListener implements KafkaClientSupplier {
public KafkaClientSupplierImpl(String key) throws JCNativeException {
super(key);
}

@Override
public Admin getAdmin(final Map<String, Object> config) {
raiseEvent("getAdmin", config);
Object retVal = getReturnData();
return (Admin) retVal;
}

@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
raiseEvent("getProducer", config);
Object retVal = getReturnData();
return (Producer<byte[], byte[]>) retVal;
}

@Override
public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
raiseEvent("getConsumer", config);
Object retVal = getReturnData();
return (Consumer<byte[], byte[]>) retVal;
}

@Override
public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) {
raiseEvent("getRestoreConsumer", config);
Object retVal = getReturnData();
return (Consumer<byte[], byte[]>) retVal;
}

@Override
public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config) {
raiseEvent("getGlobalConsumer", config);
Object retVal = getReturnData();
return (Consumer<byte[], byte[]>) retVal;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* MIT License
*
* Copyright (c) 2022 MASES s.r.l.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.mases.kafkabridge.streams;

import org.apache.kafka.streams.KafkaStreams;
import org.mases.jcobridge.*;

public final class StateListenerImpl extends JCListener implements KafkaStreams.StateListener {
public StateListenerImpl(String key) throws JCNativeException {
super(key);
}

@Override
public void onChange(final KafkaStreams.State newState, KafkaStreams.State oldState) {
raiseEvent("onChange", newState, oldState);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* MIT License
*
* Copyright (c) 2022 MASES s.r.l.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.mases.kafkabridge.streams.errors;

import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.mases.jcobridge.*;

public final class StreamsUncaughtExceptionHandlerImpl extends JCListener implements StreamsUncaughtExceptionHandler {
public StreamsUncaughtExceptionHandlerImpl(String key) throws JCNativeException {
super(key);
}

@Override
public StreamThreadExceptionResponse handle(final Throwable exception) {
raiseEvent("handle", exception);
Object retVal = getReturnData();
return StreamThreadExceptionResponse.valueOf((String) retVal);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* MIT License
*
* Copyright (c) 2022 MASES s.r.l.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package org.mases.kafkabridge.streams.kstream;

import org.apache.kafka.streams.kstream.Aggregator;
import org.mases.jcobridge.*;

public final class AggregatorImpl extends JCListener implements Aggregator {
public AggregatorImpl(String key) throws JCNativeException {
super(key);
}

@Override
public Object apply(Object key, Object value, Object aggregate) {
raiseEvent("apply", key, value, aggregate);
Object retVal = getReturnData();
return retVal;
}
}
Loading

0 comments on commit 89e8a56

Please sign in to comment.