diff --git a/.gitattributes b/.gitattributes
index 76e79f17a4..5b45130a2f 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -1,2 +1,5 @@
*.psd1 diff
-*.psm1 diff
\ No newline at end of file
+*.psm1 diff
+
+*.sh text eol=lf
+*.yaml text eol=auto
\ No newline at end of file
diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 65633f0ce5..b1aa59c4ba 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -132,11 +132,10 @@ jobs:
fetch-depth: '1'
submodules: 'true'
- - name: Pre compile tests
- run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" tests\net\KNetTest.sln
-
- name: Pre compile
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src\net\KNet\KNet.csproj
+ env:
+ GITHUB_SIMPLIFIED_GENERATION: true
- name: Set up Apache Maven Central
uses: actions/setup-java@v4
@@ -175,7 +174,7 @@ jobs:
- name: Recompile to create nuget packages
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src\net\KNet.sln
-
+
- name: Prepare PowerShell package
run: |
Copy-Item .\src\net\KNetPS\MASES.KNetPS.psd1 -Destination .\MASES.KNetPS\MASES.KNetPS.psd1 -Force
@@ -192,4 +191,156 @@ jobs:
- uses: actions/upload-artifact@v4
with:
name: KNet
- path: .\bin\*nupkg
\ No newline at end of file
+ path: .\bin\*nupkg
+
+ - name: Compile tests
+ run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" tests\net\KNetTest.sln
+ env:
+ GITHUB_TEST_PREPARATION: true
+
+ - uses: actions/upload-artifact@v4
+ with:
+ name: KNet_bin_${{ github.sha }}
+ path: .\bin\*
+
+ - uses: actions/upload-artifact@v4
+ with:
+ name: KNet_jars_${{ github.sha }}
+ path: .\jars\*
+
+ - name: Save KNet bin in cache
+ uses: actions/cache/save@v4
+ with:
+ enableCrossOsArchive: true
+ path: ./bin/
+ key: KNet_bin_${{ github.sha }}
+
+ build_container_knettest:
+ needs: check_changes
+ if: "always() && needs.check_changes.outputs.run_build_windows == 'true'"
+ runs-on: ubuntu-latest
+ # Steps represent a sequence of tasks that will be executed as part of the job
+ steps:
+ # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
+ - uses: actions/checkout@v4
+ with:
+ fetch-depth: '1'
+ submodules: 'true'
+
+ - name: Pre compile
+ run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src/net/KNet/KNet.csproj
+ env:
+ GITHUB_SIMPLIFIED_GENERATION: true
+
+ - name: Set up Apache Maven Central
+ uses: actions/setup-java@v4
+ with: # running setup-java again overwrites the settings.xml
+ distribution: temurin
+ java-version: 11
+ cache: 'maven'
+ server-id: ossrh # Value of the distributionManagement/repository/id field of the pom.xml
+ server-username: MAVEN_USERNAME # env variable for username in deploy
+ server-password: MAVEN_CENTRAL_TOKEN # env variable for token in deploy
+ gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} # Value of the GPG private key to import
+ gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase
+
+ - name: Create Jars
+ run: mvn --file ./src/jvm/knet/pom.xml --no-transfer-progress package
+ env:
+ MAVEN_USERNAME: ${{ secrets.MAVEN_USERNAME }}
+ MAVEN_CENTRAL_TOKEN: ${{ secrets.MAVEN_CENTRAL_TOKEN }}
+ MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }}
+
+ - name: Set up QEMU
+ uses: docker/setup-qemu-action@v3
+
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v3
+
+ - name: Login to GitHub Container Registry
+ uses: docker/login-action@v3
+ with:
+ registry: ghcr.io
+ username: ${{ github.actor }}
+ password: ${{ secrets.GITHUB_TOKEN }}
+
+ - name: Extract metadata (tags, labels) for Docker
+ id: meta
+ uses: docker/metadata-action@v5
+ with:
+ images: |
+ ghcr.io/${{ github.repository_owner }}/knettest
+
+ - name: Build and push
+ uses: docker/build-push-action@v5
+ with:
+ file: ./src/container/DockerfileKNet.linux
+ context: .
+ platforms: linux/amd64,linux/arm64
+ push: true
+ tags: ${{ steps.meta.outputs.tags }}
+ labels: ${{ steps.meta.outputs.labels }}
+
+ execute_tests:
+ needs: [build_windows, build_container_knettest]
+ services:
+ kafka:
+ # Private registry image
+ image: ghcr.io/${{ github.repository_owner }}/knettest:${{ github.ref_name }}
+ credentials:
+ username: ${{ github.actor }}
+ password: ${{ secrets.GITHUB_TOKEN }}
+ ports:
+ - 9092:9092
+ env:
+ KNET_RUNNING_MODE: standalone
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+
+ strategy:
+ fail-fast: false
+ matrix:
+ os: [ 'ubuntu-latest' ] #, 'windows-latest' ]
+ framework: [ 'net462', 'net6.0', 'net8.0' ]
+ buffered: [ 'runBuffered', '']
+ extraValue: [ '', 'withBigExtraValue', 'withBigBigExtraValue' ]
+ jdk_vendor: [ 'temurin', 'zulu', 'microsoft', 'corretto', 'oracle']
+ jdk_version: [ '11', '17', '21' ] # only LTS versions
+ exclude:
+ - os: ubuntu-latest
+ framework: net462
+ - jdk_vendor: oracle
+ jdk_version: 11
+
+ runs-on: ${{ matrix.os }}
+ steps:
+ - name: Restore KNet bin from cache
+ uses: actions/cache/restore@v4
+ with:
+ fail-on-cache-miss: true
+ enableCrossOsArchive: true
+ path: ./bin/
+ key: KNet_bin_${{ github.sha }}
+
+ - name: Set up JDK distribution
+ uses: actions/setup-java@v4
+ with: # running setup-java again overwrites the settings.xml
+ distribution: ${{ matrix.jdk_vendor }}
+ java-version: ${{ matrix.jdk_version }}
+
+ - name: Executing KNetTest on Ubuntu with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: matrix.os == 'ubuntu-latest'
+ run: dotnet ./bin/${{ matrix.framework }}/KNetTest.dll localhost:9092 randomizeTopicName ${{ matrix.buffered }} ${{ matrix.extraValue }}
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+
+ - name: Executing KNetTest on Windows with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os == 'windows-latest' && matrix.framework != 'net462' }}
+ run: dotnet .\bin\${{ matrix.framework }}\KNetTest.dll localhost:9092 randomizeTopicName ${{ matrix.buffered }} ${{ matrix.extraValue }}
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+
+ - name: Executing KNetTest on Windows with ${{ matrix.jdk_vendor }} ${{ matrix.jdk_version }}
+ if: ${{ matrix.os == 'windows-latest' && matrix.framework == 'net462' }}
+ run: .\bin\${{ matrix.framework }}\KNetTest.exe localhost:9092 randomizeTopicName ${{ matrix.buffered }} ${{ matrix.extraValue }}
+ env:
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
\ No newline at end of file
diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml
index 49bef71043..54249efe42 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -115,7 +115,9 @@ jobs:
- name: Maven preparation (step 1)
if: matrix.language == 'java'
run: dotnet build --no-incremental --configuration Release --framework net8.0 /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" ./src/net/KNet/KNet.csproj
-
+ env:
+ GITHUB_SIMPLIFIED_GENERATION: true
+
- if: matrix.language == 'java'
run: mvn --file ./src/jvm/knet/pom.xml --no-transfer-progress package
diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml
index ae50618290..67cb13c60e 100644
--- a/.github/workflows/docker.yaml
+++ b/.github/workflows/docker.yaml
@@ -25,7 +25,9 @@ jobs:
- name: Pre compile
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src/net/KNet/KNet.csproj
-
+ env:
+ GITHUB_SIMPLIFIED_GENERATION: true
+
- name: Set up Apache Maven Central
uses: actions/setup-java@v4
with: # running setup-java again overwrites the settings.xml
@@ -94,7 +96,9 @@ jobs:
- name: Pre compile
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src/net/KNet/KNet.csproj
-
+ env:
+ GITHUB_SIMPLIFIED_GENERATION: true
+
- name: Set up Apache Maven Central
uses: actions/setup-java@v4
with: # running setup-java again overwrites the settings.xml
diff --git a/.github/workflows/generateclasses.yaml b/.github/workflows/generateclasses.yaml
index 9a77e3b19a..9e6e5a3ff8 100644
--- a/.github/workflows/generateclasses.yaml
+++ b/.github/workflows/generateclasses.yaml
@@ -45,7 +45,9 @@ jobs:
- name: Pre compile
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src\net\KNet\KNet.csproj
-
+ env:
+ GITHUB_SIMPLIFIED_GENERATION: true
+
- name: Set up Apache Maven Central
uses: actions/setup-java@v4
with: # running setup-java again overwrites the settings.xml
@@ -103,6 +105,10 @@ jobs:
env:
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ONLINE }}
+ - name: Try compilation
+ if: ${{ inputs.UseLatestJNetReflector == true }}
+ run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src\net\KNet\KNet.csproj
+
- name: Request a PR to commit changes made with unpublished version of JNetReflector
if: ${{ github.repository_owner == 'masesgroup' && inputs.GeneratePR == true && inputs.UseLatestJNetReflector == true }} #do not push any changes outside main repo or GeneratePR is false
uses: peter-evans/create-pull-request@v6
diff --git a/.github/workflows/pullrequest.yaml b/.github/workflows/pullrequest.yaml
index f656ea4727..5385f1da04 100644
--- a/.github/workflows/pullrequest.yaml
+++ b/.github/workflows/pullrequest.yaml
@@ -67,7 +67,9 @@ jobs:
- name: Pre compile
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src/net/KNet/KNet.csproj
-
+ env:
+ GITHUB_SIMPLIFIED_GENERATION: true
+
- name: Set up Apache Maven Central
uses: actions/setup-java@v4
with: # running setup-java again overwrites the settings.xml
@@ -117,7 +119,9 @@ jobs:
- name: Pre compile
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src\net\KNet\KNet.csproj
-
+ env:
+ GITHUB_SIMPLIFIED_GENERATION: true
+
- name: Set up Apache Maven Central
uses: actions/setup-java@v4
with: # running setup-java again overwrites the settings.xml
diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
index 0597c49477..25a38f8557 100644
--- a/.github/workflows/release.yaml
+++ b/.github/workflows/release.yaml
@@ -31,7 +31,9 @@ jobs:
- name: Pre compile
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src\net\KNet\KNet.csproj
-
+ env:
+ GITHUB_SIMPLIFIED_GENERATION: true
+
- name: Set up Apache Maven Central
uses: actions/setup-java@v4
with: # running setup-java again overwrites the settings.xml
diff --git a/src/container/DockerfileKNet.linux b/src/container/DockerfileKNet.linux
index 6dfd9a9a7b..154d9894e6 100644
--- a/src/container/DockerfileKNet.linux
+++ b/src/container/DockerfileKNet.linux
@@ -8,20 +8,35 @@ ENV KNET_DOCKER_BUILD_ACTIONS=true
# Restore as distinct layers
RUN dotnet restore KNetDocker.sln -a $TARGETARCH
# Build and publish a release
-RUN dotnet publish ./KNetCLI/KNetCLI.csproj --framework net8.0 -c Release -o out -a $TARGETARCH
+RUN dotnet publish ./KNetCLI/KNetCLI.csproj /p:NoWarn="0108%3B1030%3B0618" --framework net8.0 -c Release -o out -a $TARGETARCH
# Build runtime image
FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/runtime:8.0-jammy
+LABEL org.opencontainers.image.authors='https://github.com/masesgroup', \
+ org.opencontainers.image.url='https://knet.masesgroup.com' \
+ org.opencontainers.image.documentation='https://knet.masesgroup.com' \
+ org.opencontainers.image.source='https://github.com/masesgroup/KNet' \
+ org.opencontainers.image.vendor='MASES Group' \
+ org.opencontainers.image.licenses='Apache-2.0' \
+ org.opencontainers.image.description="KNet Server-Client container"
+
# Add JRE
RUN apt-get update && apt-get install -y --no-install-recommends openjdk-17-jre-headless && rm -rf /var/lib/apt/lists/*
ADD ./jars /app/jars
ADD ./src/config /app/config
+ADD ./src/config/kraft /app/config/kraft
+ADD ./src/container/config_container /app/config_container
+ADD ./src/container/KNetRun.sh /app
ENV JCOBRIDGE_JVMPath=/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
ENV JCOBRIDGE_LicensePath=
+EXPOSE 2181
+EXPOSE 9092
+
WORKDIR /app
COPY --from=build-env /app/out .
-ENTRYPOINT ["dotnet", "MASES.KNetCLI.dll"]
+RUN chmod +x /app/KNetRun.sh
+CMD /app/KNetRun.sh
diff --git a/src/container/DockerfileKNetConnect.linux b/src/container/DockerfileKNetConnect.linux
index d2486b3712..51996c328e 100644
--- a/src/container/DockerfileKNetConnect.linux
+++ b/src/container/DockerfileKNetConnect.linux
@@ -8,7 +8,7 @@ ENV KNET_DOCKER_BUILD_ACTIONS=true
# Restore as distinct layers
RUN dotnet restore KNetDocker.sln -a $TARGETARCH
# Build and publish a release
-RUN dotnet publish ./KNetConnect/KNetConnect.csproj --framework net8.0 -c Release -o out -a $TARGETARCH
+RUN dotnet publish ./KNetConnect/KNetConnect.csproj /p:NoWarn="0108%3B1030%3B0618" --framework net8.0 -c Release -o out -a $TARGETARCH
# Build runtime image
FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/runtime:8.0-jammy
@@ -16,6 +16,14 @@ FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/runtime:8.0-jammy
# Add JRE
RUN apt-get update && apt-get install -y --no-install-recommends openjdk-17-jre-headless && rm -rf /var/lib/apt/lists/*
+LABEL org.opencontainers.image.authors='https://github.com/masesgroup', \
+ org.opencontainers.image.url='https://knet.masesgroup.com' \
+ org.opencontainers.image.documentation='https://knet.masesgroup.com' \
+ org.opencontainers.image.source='https://github.com/masesgroup/KNet' \
+ org.opencontainers.image.vendor='MASES Group' \
+ org.opencontainers.image.licenses='Apache-2.0' \
+ org.opencontainers.image.description="KNet Connect container"
+
ADD ./jars /app/jars
ADD ./src/config /app/config
diff --git a/src/container/DockerfileKNetTest.linux b/src/container/DockerfileKNetTest.linux
new file mode 100644
index 0000000000..e959edbfd6
--- /dev/null
+++ b/src/container/DockerfileKNetTest.linux
@@ -0,0 +1,43 @@
+FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/sdk:8.0-jammy AS build-env
+ARG TARGETARCH
+WORKDIR /app
+
+# Copy everything
+COPY ./src/net ./
+ENV KNET_DOCKER_BUILD_ACTIONS=true
+# Restore as distinct layers
+RUN dotnet restore KNetDocker.sln -a $TARGETARCH
+# Build and publish a release
+RUN dotnet publish ./KNetCLI/KNetCLI.csproj --framework net8.0 -c Release -o out -a $TARGETARCH
+
+# Build runtime image
+FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/runtime:8.0-jammy
+
+LABEL org.opencontainers.image.authors='https://github.com/masesgroup', \
+ org.opencontainers.image.url='https://knet.masesgroup.com' \
+ org.opencontainers.image.documentation='https://knet.masesgroup.com' \
+ org.opencontainers.image.source='https://github.com/masesgroup/KNet' \
+ org.opencontainers.image.vendor='MASES Group' \
+ org.opencontainers.image.licenses='Apache-2.0' \
+ org.opencontainers.image.description="KNet Test container"
+
+# Add JRE
+RUN apt-get update && apt-get install -y --no-install-recommends openjdk-17-jre-headless && rm -rf /var/lib/apt/lists/*
+
+ADD ./jars /app/jars
+ADD ./src/config /app/config
+ADD ./src/container/KNetTestRun.sh /app
+ADD ./src/container/zookeeper.properties /app
+ADD ./src/container/server.properties /app
+ADD ./src/container/log4j.properties /app
+
+ENV JCOBRIDGE_JVMPath=/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
+ENV JCOBRIDGE_LicensePath=
+
+EXPOSE 2181
+EXPOSE 9092
+
+WORKDIR /app
+COPY --from=build-env /app/out .
+RUN chmod +x /app/KNetTestRun.sh
+CMD /app/KNetTestRun.sh
diff --git a/src/container/KNetRun.sh b/src/container/KNetRun.sh
new file mode 100644
index 0000000000..022d540dd5
--- /dev/null
+++ b/src/container/KNetRun.sh
@@ -0,0 +1,159 @@
+#!/bin/bash -e
+
+if [[ -z "${KNET_RUNNING_MODE}" ]]; then
+ echo "Starting command line execution of KNetCLI with arguments: $@"
+ # Generic execution
+ dotnet /app/MASES.KNetCLI.dll $@
+else
+### inherited from https://github.com/wurstmeister/kafka-docker/blob/901c084811fa9395f00af3c51e0ac6c32c697034/start-kafka.sh
+
+ # Store original IFS config, so we can restore it at various stages
+ ORIG_IFS=$IFS
+
+ if [[ -z "$KAFKA_PORT" ]]; then
+ export KAFKA_PORT=9092
+ fi
+
+ if [[ -z "$KAFKA_ADVERTISED_PORT" && \
+ -z "$KAFKA_LISTENERS" && \
+ -z "$KAFKA_ADVERTISED_LISTENERS" && \
+ -S /var/run/docker.sock ]]; then
+ KAFKA_ADVERTISED_PORT=$(docker port "$(hostname)" $KAFKA_PORT | sed -r 's/.*:(.*)/\1/g' | head -n1)
+ export KAFKA_ADVERTISED_PORT
+ fi
+
+ if [[ -z "$KAFKA_BROKER_ID" ]]; then
+ if [[ -n "$BROKER_ID_COMMAND" ]]; then
+ KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND")
+ export KAFKA_BROKER_ID
+ else
+ # By default auto allocate broker ID
+ export KAFKA_BROKER_ID=-1
+ fi
+ fi
+
+ if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
+ sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh"
+ unset KAFKA_HEAP_OPTS
+ fi
+
+ if [[ -n "$HOSTNAME_COMMAND" ]]; then
+ HOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND")
+
+ # Replace any occurrences of _{HOSTNAME_COMMAND} with the value
+ IFS=$'\n'
+ for VAR in $(env); do
+ if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; then
+ eval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}"
+ fi
+ done
+ IFS=$ORIG_IFS
+ fi
+
+ if [[ -n "$PORT_COMMAND" ]]; then
+ PORT_VALUE=$(eval "$PORT_COMMAND")
+
+ # Replace any occurrences of _{PORT_COMMAND} with the value
+ IFS=$'\n'
+ for VAR in $(env); do
+ if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; then
+ eval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}"
+ fi
+ done
+ IFS=$ORIG_IFS
+ fi
+
+ if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then
+ KAFKA_BROKER_RACK=$(eval "$RACK_COMMAND")
+ export KAFKA_BROKER_RACK
+ fi
+
+ # Try and configure minimal settings or exit with error if there isn't enough information
+ if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then
+ # Maintain existing behaviour
+ # If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined.
+ export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE"
+ fi
+
+ #Issue newline to config file in case there is not one already
+ echo "" >> /app/config_container/server.properties
+
+ #Issue newline to config file in case there is not one already
+ echo "" >> /app/config_container/zookeeper.properties
+
+ (
+ function updateConfig() {
+ key=$1
+ value=$2
+ file=$3
+
+ # Omit $value here, in case there is sensitive information
+ echo "[Configuring] '$key' in '$file'"
+
+ # If config exists in file, replace it. Otherwise, append to file.
+ if grep -E -q "^#?$key=" "$file"; then
+ sed -r -i "s@^#?$key=.*@$key=$value@g" "$file" #note that no config values may contain an '@' char
+ else
+ echo "$key=$value" >> "$file"
+ fi
+ }
+
+ # Fixes #312
+ # KAFKA_VERSION + KAFKA_HOME + grep -rohe KAFKA[A-Z0-0_]* /opt/kafka/bin | sort | uniq | tr '\n' '|'
+ EXCLUSIONS="|KAFKA_VERSION|KAFKA_HOME|KAFKA_DEBUG|KAFKA_GC_LOG_OPTS|KAFKA_HEAP_OPTS|KAFKA_JMX_OPTS|KAFKA_JVM_PERFORMANCE_OPTS|KAFKA_LOG|KAFKA_OPTS|"
+
+ # Read in env as a new-line separated array. This handles the case of env variables have spaces and/or carriage returns. See #313
+ IFS=$'\n'
+ for VAR in $(env)
+ do
+ env_var=$(echo "$VAR" | cut -d= -f1)
+ if [[ "$EXCLUSIONS" = *"|$env_var|"* ]]; then
+ echo "Excluding $env_var from broker config"
+ continue
+ fi
+
+ if [[ $env_var =~ ^KAFKA_ ]]; then
+ kafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .)
+ updateConfig "$kafka_name" "${!env_var}" "/app/config_container/server.properties"
+ fi
+
+ if [[ $env_var =~ ^ZOOKEEPER_ ]]; then
+ kafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .)
+ updateConfig "$kafka_name" "${!env_var}" "/app/config_container/server.properties"
+ fi
+
+ if [[ $env_var =~ ^LOG4J_ ]]; then
+ log4j_name=$(echo "$env_var" | tr '[:upper:]' '[:lower:]' | tr _ .)
+ updateConfig "$log4j_name" "${!env_var}" "/app/config_container/log4j.properties"
+ fi
+ done
+ )
+
+### end inherited from https://github.com/wurstmeister/kafka-docker/blob/901c084811fa9395f00af3c51e0ac6c32c697034/start-kafka.sh
+
+ if [ ${KNET_RUNNING_MODE} = "zookeeper" ]; then
+ echo "Starting zookeeper"
+ # Start zookeeper
+ dotnet /app/MASES.KNetCLI.dll zookeeperstart -Log4JConfiguration /app/config_container/log4j.properties /app/config_container/zookeeper.properties
+ elif [ ${KNET_RUNNING_MODE} = "broker" ]; then
+ echo "Starting broker"
+ # Start kafka broker
+ dotnet /app/MASES.KNetCLI.dll kafkastart -Log4JConfiguration /app/config_container/log4j.properties /app/config_container/server.properties
+ elif [ ${KNET_RUNNING_MODE} = "standalone" ]; then
+ echo "Starting zookeeper"
+ # Start zookeeper
+ dotnet /app/MASES.KNetCLI.dll zookeeperstart -Log4JConfiguration /app/config_container/log4j.properties /app/config_container/zookeeper.properties &
+
+ echo "Starting broker"
+ # Start kafka broker
+ dotnet /app/MASES.KNetCLI.dll kafkastart -Log4JConfiguration /app/config_container/log4j.properties /app/config_container/server.properties &
+
+ # Wait for any process to exit
+ wait -n
+
+ # Exit with status of process that exited first
+ exit $?
+ else
+ echo "KNET_RUNNING_MODE exist, but its value (${KNET_RUNNING_MODE}) is not zookeeper, broker or standalone"
+ fi
+fi
\ No newline at end of file
diff --git a/src/container/config_container/log4j.properties b/src/container/config_container/log4j.properties
new file mode 100644
index 0000000000..94a97a1620
--- /dev/null
+++ b/src/container/config_container/log4j.properties
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Unspecified loggers and loggers with additivity=true output to server.log and stdout
+# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
+log4j.rootLogger=DEBUG, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
+log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Change the line below to adjust ZK client logging
+log4j.logger.org.apache.zookeeper=INFO
+
+# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
+log4j.logger.kafka=INFO
+log4j.logger.org.apache.kafka=INFO
+
+# Change to DEBUG or TRACE to enable request logging
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+# Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output
+# related to the handling of requests
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+# Change the line below to adjust KRaft mode controller logging
+log4j.logger.org.apache.kafka.controller=INFO, controllerAppender
+log4j.additivity.org.apache.kafka.controller=false
+
+# Change the line below to adjust ZK mode controller logging
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=INFO, stateChangeAppender
+log4j.additivity.state.change.logger=false
+
+# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses
+log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
+log4j.additivity.kafka.authorizer.logger=false
+
diff --git a/src/container/config_container/server.properties b/src/container/config_container/server.properties
new file mode 100644
index 0000000000..d194846386
--- /dev/null
+++ b/src/container/config_container/server.properties
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
+# See kafka.server.KafkaConfig for additional details and defaults
+#
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. If not configured, the host name will be equal to the value of
+# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
+# FORMAT:
+# listeners = listener_name://host_name:port
+# EXAMPLE:
+# listeners = PLAINTEXT://your.host.name:9092
+listeners=PLAINTEXT://0.0.0.0:9092
+
+# Listener name, hostname and port the broker will advertise to clients.
+# If not set, it uses the value for "listeners".
+advertised.listeners=PLAINTEXT://localhost:9092
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+listener.security.protocol.map=PLAINTEXT:PLAINTEXT
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=127.0.0.1:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=18000
+
+
+############################# Group Coordinator Settings #############################
+
+# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
+# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
+# The default value for this is 3 seconds.
+# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
+# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
+group.initial.rebalance.delay.ms=0
diff --git a/src/container/config_container/zookeeper.properties b/src/container/config_container/zookeeper.properties
new file mode 100644
index 0000000000..90f4332ec3
--- /dev/null
+++ b/src/container/config_container/zookeeper.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+# Disable the adminserver by default to avoid port conflicts.
+# Set the port to something non-conflicting if choosing to enable this
+admin.enableServer=false
+# admin.serverPort=8080
diff --git a/src/net/KNet/KNet.csproj b/src/net/KNet/KNet.csproj
index d99960cde9..9010cd3e8b 100644
--- a/src/net/KNet/KNet.csproj
+++ b/src/net/KNet/KNet.csproj
@@ -21,18 +21,27 @@
-
+
-
+
-
-
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/net/KNet/KNetCore.cs b/src/net/KNet/KNetCore.cs
index b5ed8a5cb7..a8b5faee33 100644
--- a/src/net/KNet/KNetCore.cs
+++ b/src/net/KNet/KNetCore.cs
@@ -142,11 +142,13 @@ protected override string[] ProcessCommandLine()
/// If does not have a corresponding implemented
protected virtual void PrepareMainClassToRun(string className)
{
- if (string.IsNullOrEmpty(className)) return;
+ if (string.IsNullOrWhiteSpace(className)) return;
+ var invariantLowClassName = className.ToLowerInvariant();
Type type = null;
foreach (var item in typeof(KNetCore<>).Assembly.ExportedTypes)
{
- if (item.Name == className || item.FullName == className)
+ if (item.Name.ToLowerInvariant() == invariantLowClassName
+ || item.FullName.ToLowerInvariant() == invariantLowClassName)
{
type = item;
break;
@@ -190,7 +192,10 @@ protected virtual void PrepareMainClassToRun(string className)
///
public static bool? ApplicationDisableJMX { get; set; }
- string _classToRun;
+ ///
+ /// value can be overridden in subclasses
+ ///
+ protected string _classToRun;
///
/// The class to run in CLI version
///
diff --git a/src/net/KNetCLI/KNetCLICore.cs b/src/net/KNetCLI/KNetCLICore.cs
index 1014b2b354..dd3371e0d3 100644
--- a/src/net/KNetCLI/KNetCLICore.cs
+++ b/src/net/KNetCLI/KNetCLICore.cs
@@ -132,7 +132,20 @@ public override IEnumerable CommandLineArguments
protected override string[] ProcessCommandLine()
{
- var result = base.ProcessCommandLine();
+ var result = base.ProcessCommandLine(); // returns the filtered args till now
+
+ if (string.IsNullOrWhiteSpace(_classToRun) && result != null && result.Length > 0)
+ {
+ // try to use first argument as ClassToRun
+ _classToRun = result[0];
+ int remaining = result.Length - 1;
+ if (remaining != 0)
+ {
+ string[] tmp_result = new string[remaining];
+ Array.Copy(result, 1, tmp_result, 0, remaining); // remove first argument
+ result = tmp_result;
+ }
+ }
_Interactive = ParsedArgs.Exist(CLIParam.Interactive[0]);
_NoLogo = ParsedArgs.Exist(CLIParam.NoLogo[0]);
@@ -157,6 +170,14 @@ protected override string[] ProcessCommandLine()
if (!namespaceList.Contains(item.Namespace)) namespaceList.Add(item.Namespace);
}
}
+ var knetAssembly = typeof(KNetCore<>).Assembly;
+ foreach (var item in knetAssembly.GetExportedTypes())
+ {
+ if (item.IsPublic)
+ {
+ if (!namespaceList.Contains(item.Namespace)) namespaceList.Add(item.Namespace);
+ }
+ }
if (ParsedArgs.Exist(CLIParam.NamespaceList[0]))
{
var namespaces = ParsedArgs.Get(CLIParam.JarList[0]).Split(',', ';');
@@ -180,37 +201,37 @@ protected override string[] ProcessCommandLine()
PrepareMainClassToRun(ClassToRun);
- switch (ClassToRun)
+ switch (ClassToRun.ToLowerInvariant())
{
- case "VerifiableConsumer":
+ case "verifiableconsumer":
ApplicationHeapSize = "512M";
break;
- case "VerifiableProducer":
+ case "verifiableproducer":
ApplicationHeapSize = "512M";
break;
- case "StreamsResetter":
+ case "streamsresetter":
ApplicationHeapSize = "512M";
break;
- case "ZooKeeperStart":
+ case "zookeeperstart":
ApplicationHeapSize = "512M";
ApplicationInitialHeapSize = "512M";
break;
- case "ZooKeeperShell":
+ case "zookeepershell":
ApplicationHeapSize = "512M";
ApplicationInitialHeapSize = "512M";
break;
- case "KafkaStart":
+ case "kafkastart":
ApplicationHeapSize = Environment.Is64BitOperatingSystem ? "1G" : "512M";
ApplicationInitialHeapSize = Environment.Is64BitOperatingSystem ? "1G" : "512M";
break;
- case "ConnectStandalone":
- case "ConnectDistributed":
- case "KNetConnectStandalone":
- case "KNetConnectDistributed":
+ case "connectstandalone":
+ case "connectdistributed":
+ case "knetconnectstandalone":
+ case "knetconnectdistributed":
{
throw new ArgumentException($"Use KNetConnect to run KNet Connect SDK");
}
- case "MirrorMaker2":
+ case "mirrormaker2":
{
ApplicationLog4JPath = Path.Combine(Const.AssemblyLocation, "config", "connect-log4j.properties");
ApplicationHeapSize = "2G";
diff --git a/tests/net/Common/SharedKNetCore.cs b/tests/net/Common/SharedKNetCore.cs
index 0788af0950..5449547932 100644
--- a/tests/net/Common/SharedKNetCore.cs
+++ b/tests/net/Common/SharedKNetCore.cs
@@ -16,12 +16,40 @@
* Refer to LICENSE for more information.
*/
+using Java.Lang;
using System;
namespace MASES.KNet.TestCommon
{
internal class SharedKNetCore : KNetCore
{
+ public static void Create()
+ {
+ ApplicationJarRootPath = Const.DefaultJarsPath;
+ CreateGlobalInstance();
+ }
+
+ public static int ManageException(System.Exception e)
+ {
+ int retCode = 0;
+ if (e is ClassNotFoundException cnfe)
+ {
+ Console.WriteLine($"Failed with {cnfe}, current ClassPath is {SharedKNetCore.GlobalInstance.ClassPath}");
+ retCode = 1;
+ }
+ else if (e is NoClassDefFoundError ncdfe)
+ {
+ Console.WriteLine($"Failed with {ncdfe}, current ClassPath is {SharedKNetCore.GlobalInstance.ClassPath}");
+ retCode = 1;
+ }
+ else
+ {
+ Console.WriteLine($"Failed with {e}");
+ retCode = 1;
+ }
+ return retCode;
+ }
+
public override bool LogClassPath => false;
public long CurrentJNICalls => JVMStats.TotalJNICalls;
diff --git a/tests/net/KNetBenchmark/ProgramInit.cs b/tests/net/KNetBenchmark/ProgramInit.cs
index a382ffb18e..e2cfebc2ce 100644
--- a/tests/net/KNetBenchmark/ProgramInit.cs
+++ b/tests/net/KNetBenchmark/ProgramInit.cs
@@ -422,6 +422,7 @@ partial class Program
static void Init(string[] args)
{
+ BenchmarkKNetCore.ApplicationJarRootPath = Const.DefaultJarsPath;
BenchmarkKNetCore.ApplicationHeapSize = "4G";
BenchmarkKNetCore.ApplicationInitialHeapSize = "4G";
BenchmarkKNetCore.CreateGlobalInstance();
diff --git a/tests/net/KNetClassicTest/Program.cs b/tests/net/KNetClassicTest/Program.cs
index 934ea06812..6d95674248 100644
--- a/tests/net/KNetClassicTest/Program.cs
+++ b/tests/net/KNetClassicTest/Program.cs
@@ -47,7 +47,7 @@ class Program
static void Main(string[] args)
{
- SharedKNetCore.CreateGlobalInstance();
+ SharedKNetCore.Create();
var appArgs = SharedKNetCore.FilteredArgs;
if (appArgs.Length != 0)
@@ -55,24 +55,31 @@ static void Main(string[] args)
serverToUse = args[0];
}
- CreateTopic();
-
- Thread threadProduce = new(ProduceSomething)
+ try
{
- Name = "produce"
- };
- threadProduce.Start();
+ CreateTopic();
- Thread threadConsume = new(ConsumeSomething)
- {
- Name = "consume"
- };
- threadConsume.Start();
+ Thread threadProduce = new(ProduceSomething)
+ {
+ Name = "produce"
+ };
+ threadProduce.Start();
+
+ Thread threadConsume = new(ConsumeSomething)
+ {
+ Name = "consume"
+ };
+ threadConsume.Start();
- Console.CancelKeyPress += Console_CancelKeyPress;
- Console.WriteLine("Press Ctrl-C to exit");
- resetEvent.WaitOne();
- Thread.Sleep(2000); // wait the threads exit
+ Console.CancelKeyPress += Console_CancelKeyPress;
+ Console.WriteLine("Press Ctrl-C to exit");
+ resetEvent.WaitOne();
+ Thread.Sleep(2000); // wait the threads exit
+ }
+ catch (Exception e)
+ {
+ Environment.ExitCode = SharedKNetCore.ManageException(e);
+ }
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
@@ -129,7 +136,9 @@ static void CreateTopic()
catch (Java.Util.Concurrent.ExecutionException ex)
{
Console.WriteLine(ex.InnerException.Message);
+ throw;
}
+ catch (Org.Apache.Kafka.Common.Errors.TopicExistsException) { }
catch (Exception e)
{
Console.WriteLine(e.Message);
diff --git a/tests/net/KNetCompactedReplicatorTest/KNetCompactedReplicatorTest.csproj b/tests/net/KNetCompactedReplicatorTest/KNetCompactedReplicatorTest.csproj
index 3679ea8e6c..ccdf4ccd72 100644
--- a/tests/net/KNetCompactedReplicatorTest/KNetCompactedReplicatorTest.csproj
+++ b/tests/net/KNetCompactedReplicatorTest/KNetCompactedReplicatorTest.csproj
@@ -1,11 +1,11 @@
- KNetTest
+ KNetCompactedReplicatorTest
Exe
- MASES.KNetTest
- KNetTest - a test tool for KNet
- KNetTest - a test tool for KNet
+ MASES.KNetCompactedReplicatorTest
+ KNetCompactedReplicatorTest - a test tool for KNet
+ KNetCompactedReplicatorTest - a test tool for KNet
diff --git a/tests/net/KNetCompactedReplicatorTest/Program.cs b/tests/net/KNetCompactedReplicatorTest/Program.cs
index 156b15e8f2..26c5b47e2f 100644
--- a/tests/net/KNetCompactedReplicatorTest/Program.cs
+++ b/tests/net/KNetCompactedReplicatorTest/Program.cs
@@ -23,7 +23,7 @@
using System.Diagnostics;
using System.Threading;
-namespace MASES.KNetTest
+namespace MASES.KNetCompactedReplicatorTest
{
class Program
{
@@ -55,37 +55,44 @@ public override string ToString()
static void Main(string[] args)
{
- SharedKNetCore.CreateGlobalInstance();
+ SharedKNetCore.Create();
var appArgs = SharedKNetCore.FilteredArgs;
- if (appArgs.Length != 0)
+ try
{
- serverToUse = args[0];
+ if (appArgs.Length != 0)
+ {
+ serverToUse = args[0];
+ }
+ var sw = Stopwatch.StartNew();
+ TestValues("TestValues", 100, UpdateModeTypes.OnDelivery, 5);
+ sw.Stop();
+ Console.WriteLine($"End TestValues in {sw.Elapsed}");
+ sw = Stopwatch.StartNew();
+ Test("TestOnDelivery", 100, UpdateModeTypes.OnDelivery | UpdateModeTypes.Delayed, 5);
+ sw.Stop();
+ Console.WriteLine($"End TestOnDelivery in {sw.Elapsed}");
+ sw = Stopwatch.StartNew();
+ Test("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5);
+ sw.Stop();
+ Console.WriteLine($"End TestOnConsume in {sw.Elapsed}");
+ sw = Stopwatch.StartNew();
+ TestOnlyRead("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5);
+ sw.Stop();
+ Console.WriteLine($"End TestOnlyRead for TestOnConsume in {sw.Elapsed}");
+ sw = Stopwatch.StartNew();
+ Test("TestOnConsumeLessConsumers", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5, 2);
+ sw.Stop();
+ Console.WriteLine($"End TestOnConsume in {sw.Elapsed}");
+ Console.CancelKeyPress += Console_CancelKeyPress;
+ Console.WriteLine("Press Ctrl-C to exit");
+ resetEvent.WaitOne();
+ Thread.Sleep(2000); // wait the threads exit
+ }
+ catch (Exception e)
+ {
+ Environment.ExitCode = SharedKNetCore.ManageException(e);
}
- var sw = Stopwatch.StartNew();
- TestValues("TestValues", 100, UpdateModeTypes.OnDelivery, 5);
- sw.Stop();
- Console.WriteLine($"End TestValues in {sw.Elapsed}");
- sw = Stopwatch.StartNew();
- Test("TestOnDelivery", 100, UpdateModeTypes.OnDelivery | UpdateModeTypes.Delayed, 5);
- sw.Stop();
- Console.WriteLine($"End TestOnDelivery in {sw.Elapsed}");
- sw = Stopwatch.StartNew();
- Test("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5);
- sw.Stop();
- Console.WriteLine($"End TestOnConsume in {sw.Elapsed}");
- sw = Stopwatch.StartNew();
- TestOnlyRead("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5);
- sw.Stop();
- Console.WriteLine($"End TestOnlyRead for TestOnConsume in {sw.Elapsed}");
- sw = Stopwatch.StartNew();
- Test("TestOnConsumeLessConsumers", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5, 2);
- sw.Stop();
- Console.WriteLine($"End TestOnConsume in {sw.Elapsed}");
- Console.CancelKeyPress += Console_CancelKeyPress;
- Console.WriteLine("Press Ctrl-C to exit");
- resetEvent.WaitOne();
- Thread.Sleep(2000); // wait the threads exit
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
@@ -132,7 +139,7 @@ private static void Test(string topicName, int length, UpdateModeTypes type, int
BootstrapServers = serverToUse,
StateName = topicName,
ValueSerDesSelector = typeof(JsonSerDes.Value<>),
- // ValueSerDes = new JsonSerDes.Value(),
+ // ValueSerDes = new JsonSerDes.Value(),
})
{
replicator.StartAndWait();
diff --git a/tests/net/KNetTest/Program.cs b/tests/net/KNetTest/Program.cs
index 445bbc423a..e65f267f31 100644
--- a/tests/net/KNetTest/Program.cs
+++ b/tests/net/KNetTest/Program.cs
@@ -46,6 +46,8 @@ class Program
static bool flushWhileSend = false;
static bool withAck = false;
static bool runInParallel = false;
+ static bool avoidThrows = false;
+ static bool randomizeTopicName = false;
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
@@ -77,7 +79,7 @@ static TestType()
public TestType()
{
-
+
}
public TestType(int i, bool withBigExtraValue, bool bigBigExtraValue)
@@ -106,7 +108,7 @@ public override string ToString()
static void Main(string[] args)
{
- SharedKNetCore.CreateGlobalInstance();
+ SharedKNetCore.Create();
var appArgs = SharedKNetCore.FilteredArgs;
if (appArgs.Length != 0)
@@ -125,6 +127,8 @@ static void Main(string[] args)
if (args[i] == "flushWhileSend") { flushWhileSend = true; continue; }
if (args[i] == "withAck") { withAck = true; continue; }
if (args[i] == "runInParallel") { runInParallel = true; continue; }
+ if (args[i] == "avoidThrows") { avoidThrows = true; continue; }
+ if (args[i] == "randomizeTopicName") { randomizeTopicName = true; continue; }
Console.WriteLine($"Unknown {args[i]}");
}
}
@@ -140,58 +144,71 @@ static void Main(string[] args)
OnDeserialize = (topic, data) => { return new TestType(0, false, false); }
};
- CreateTopic();
- Console.CancelKeyPress += Console_CancelKeyPress;
- Console.WriteLine("Press Ctrl-C to exit");
- if (runInParallel)
+ if (randomizeTopicName)
+ {
+ topicToUse += "-" + Guid.NewGuid().ToString();
+ Console.WriteLine($"Topic name will be {topicToUse}");
+ }
+
+ try
{
- Thread threadProduce;
- Thread threadConsume;
- if (runBuffered)
+ CreateTopic();
+ Console.CancelKeyPress += Console_CancelKeyPress;
+ Console.WriteLine("Press Ctrl-C to exit");
+ if (runInParallel)
{
- threadProduce = new(ProduceSomethingBuffered)
+ Thread threadProduce;
+ Thread threadConsume;
+ if (runBuffered)
{
- Name = "produce buffered"
- };
+ threadProduce = new(ProduceSomethingBuffered)
+ {
+ Name = "produce buffered"
+ };
- threadConsume = new(ConsumeSomethingBuffered)
+ threadConsume = new(ConsumeSomethingBuffered)
+ {
+ Name = "consume buffered"
+ };
+ }
+ else
{
- Name = "consume buffered"
- };
+ threadProduce = new(ProduceSomething)
+ {
+ Name = "produce"
+ };
+
+ threadConsume = new(ConsumeSomething)
+ {
+ Name = "consume"
+ };
+ }
+ threadProduce.Start();
+ if (!onlyProduce) threadConsume.Start();
+ resetEvent.WaitOne(TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 60));
+ resetEvent.Set();
}
else
{
- threadProduce = new(ProduceSomething)
+ if (runBuffered)
{
- Name = "produce"
- };
-
- threadConsume = new(ConsumeSomething)
+ ProduceSomethingBuffered();
+ if (!onlyProduce) ConsumeSomethingBuffered();
+ }
+ else
{
- Name = "consume"
- };
+ ProduceSomething();
+ if (!onlyProduce) ConsumeSomething();
+ }
}
- threadProduce.Start();
- if (!onlyProduce) threadConsume.Start();
- resetEvent.WaitOne(TimeSpan.FromSeconds(System.Diagnostics.Debugger.IsAttached ? 1000 : 60));
- resetEvent.Set();
+ Thread.Sleep(2000); // wait the threads exit
+
+ Console.WriteLine($"End of {(runBuffered ? "buffered" : "non buffered")} test");
}
- else
+ catch (Exception e)
{
- if (runBuffered)
- {
- ProduceSomethingBuffered();
- if (!onlyProduce) ConsumeSomethingBuffered();
- }
- else
- {
- ProduceSomething();
- if (!onlyProduce) ConsumeSomething();
- }
+ Environment.ExitCode = SharedKNetCore.ManageException(e);
}
- Thread.Sleep(2000); // wait the threads exit
-
- Console.WriteLine($"End of {(runBuffered ? "buffered" : "non buffered")} test");
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
@@ -228,6 +245,8 @@ static void CreateTopic()
Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(serverToUse).ToProperties();
+ Console.WriteLine($"Creating {topic} using an AdminClient based on {props}");
+
using (IAdmin admin = KafkaAdminClient.Create(props))
{
/******* standard
@@ -246,17 +265,20 @@ static void CreateTopic()
}
catch (Java.Util.Concurrent.ExecutionException ex)
{
+ if (!avoidThrows) throw;
Console.WriteLine(ex.InnerException.Message);
}
catch (TopicExistsException) { }
catch (Exception e)
{
+ if (!avoidThrows) throw;
Console.WriteLine(e.Message);
}
}
static void ProduceSomething()
{
+ Console.WriteLine("Starting ProduceSomething");
try
{
/**** Direct mode ******
@@ -339,16 +361,19 @@ static void ProduceSomething()
}
catch (Java.Util.Concurrent.ExecutionException ex)
{
+ if (!avoidThrows) throw;
Console.WriteLine("Producer ended with error: {0}", ex.InnerException.Message);
}
catch (Exception ex)
{
+ if (!avoidThrows) throw;
Console.WriteLine("Producer ended with error: {0}", ex.Message);
}
}
static void ConsumeSomething()
{
+ Console.WriteLine("Starting ConsumeSomething");
try
{
/**** Direct mode ******
@@ -362,7 +387,8 @@ static void ConsumeSomething()
ConsumerConfigBuilder props = ConsumerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithGroupId(Guid.NewGuid().ToString())
- .WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.LATEST)
+ .WithAutoOffsetReset(runInParallel ? ConsumerConfigBuilder.AutoOffsetResetTypes.LATEST
+ : ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST)
.WithEnableAutoCommit(true)
.WithAutoCommitIntervalMs(1000);
@@ -412,7 +438,7 @@ static void ConsumeSomething()
consumer.SeekToBeginning(Collections.Singleton(tp));
}
}
-
+ int cycle = 0;
while (runInParallel ? !resetEvent.WaitOne(0) : elements < NonParallelLimit)
{
var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds);
@@ -431,6 +457,12 @@ static void ConsumeSomething()
if (consoleOutput) Console.WriteLine(str);
watcher.Stop();
}
+ cycle++;
+ if (elements == 0 && cycle == 60 * 5)
+ {
+ Console.WriteLine("Forcibly exit since no record was received within 1 minute.");
+ break;
+ }
}
watcherTotal.Stop();
}
@@ -445,16 +477,19 @@ static void ConsumeSomething()
}
catch (Java.Util.Concurrent.ExecutionException ex)
{
+ if (!avoidThrows) throw;
Console.WriteLine("Consumer ended with error: {0}", ex.InnerException.Message);
}
catch (Exception ex)
{
+ if (!avoidThrows) throw;
Console.WriteLine("Consumer ended with error: {0}", ex.Message);
}
}
static void ProduceSomethingBuffered()
{
+ Console.WriteLine("Starting ProduceSomethingBuffered");
try
{
/**** Direct mode ******
@@ -537,16 +572,19 @@ static void ProduceSomethingBuffered()
}
catch (Java.Util.Concurrent.ExecutionException ex)
{
+ if (!avoidThrows) throw;
Console.WriteLine("Producer ended with error: {0}", ex.InnerException.Message);
}
catch (Exception ex)
{
+ if (!avoidThrows) throw;
Console.WriteLine("Producer ended with error: {0}", ex.Message);
}
}
static void ConsumeSomethingBuffered()
{
+ Console.WriteLine("Starting ConsumeSomethingBuffered");
try
{
/**** Direct mode ******
@@ -560,7 +598,8 @@ static void ConsumeSomethingBuffered()
ConsumerConfigBuilder props = ConsumerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithGroupId(Guid.NewGuid().ToString())
- .WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.LATEST)
+ .WithAutoOffsetReset(runInParallel ? ConsumerConfigBuilder.AutoOffsetResetTypes.LATEST
+ : ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST)
.WithEnableAutoCommit(true)
.WithAutoCommitIntervalMs(1000);
@@ -610,7 +649,7 @@ static void ConsumeSomethingBuffered()
consumer.SeekToBeginning(Collections.Singleton(tp));
}
}
-
+ int cycle = 0;
while (runInParallel ? !resetEvent.WaitOne(0) : elements < NonParallelLimit)
{
var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds);
@@ -629,6 +668,12 @@ static void ConsumeSomethingBuffered()
if (consoleOutput) Console.WriteLine(str);
watcher.Stop();
}
+ cycle++;
+ if (elements == 0 && cycle == 60 * 5)
+ {
+ Console.WriteLine("Forcibly exit since no record was received within 1 minute.");
+ break;
+ }
}
watcherTotal.Stop();
}
@@ -643,10 +688,12 @@ static void ConsumeSomethingBuffered()
}
catch (Java.Util.Concurrent.ExecutionException ex)
{
+ if (!avoidThrows) throw;
Console.WriteLine("Consumer ended with error: {0}", ex.InnerException.Message);
}
catch (Exception ex)
{
+ if (!avoidThrows) throw;
Console.WriteLine("Consumer ended with error: {0}", ex.Message);
}
}
diff --git a/tests/net/KNetTestAdmin/Program.cs b/tests/net/KNetTestAdmin/Program.cs
index 08793a024b..4d57f593c2 100644
--- a/tests/net/KNetTestAdmin/Program.cs
+++ b/tests/net/KNetTestAdmin/Program.cs
@@ -35,7 +35,7 @@ class Program
static void Main(string[] args)
{
- SharedKNetCore.CreateGlobalInstance();
+ SharedKNetCore.Create();
var appArgs = SharedKNetCore.FilteredArgs;
if (appArgs.Length != 0)
@@ -43,15 +43,22 @@ static void Main(string[] args)
serverToUse = args[0];
}
- var builder = AdminClientConfigBuilder.Create().WithBootstrapServers(serverToUse);
+ try
+ {
+ var builder = AdminClientConfigBuilder.Create().WithBootstrapServers(serverToUse);
- //Properties props = new Properties();
- //props.Put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
+ //Properties props = new Properties();
+ //props.Put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
- using (var admin = KafkaAdminClient.Create(builder))
+ using (var admin = KafkaAdminClient.Create(builder))
+ {
+ CreateTopic(admin);
+ DeleteTopic(admin);
+ }
+ }
+ catch (Exception e)
{
- CreateTopic(admin);
- DeleteTopic(admin);
+ Environment.ExitCode = SharedKNetCore.ManageException(e);
}
}
diff --git a/tests/net/KNetTestKNetStreams/Program.cs b/tests/net/KNetTestKNetStreams/Program.cs
index 12bdeeb264..9560f287e1 100644
--- a/tests/net/KNetTestKNetStreams/Program.cs
+++ b/tests/net/KNetTestKNetStreams/Program.cs
@@ -16,9 +16,6 @@
* Refer to LICENSE for more information.
*/
-using Java.Util;
-using MASES.JCOBridge.C2JBridge;
-
using MASES.KNet.TestCommon;
using System;
using System.Text.RegularExpressions;
@@ -47,7 +44,7 @@ class Program
static void Main(string[] args)
{
- SharedKNetCore.CreateGlobalInstance();
+ SharedKNetCore.Create();
var appArgs = SharedKNetCore.FilteredArgs;
if (appArgs.Length != 0)
diff --git a/tests/net/KNetTestSerDes/Program.cs b/tests/net/KNetTestSerDes/Program.cs
index 5ba330cae6..f558f9e7dd 100644
--- a/tests/net/KNetTestSerDes/Program.cs
+++ b/tests/net/KNetTestSerDes/Program.cs
@@ -32,7 +32,7 @@ class Program
static void Main(string[] args)
{
- SharedKNetCore.CreateGlobalInstance();
+ SharedKNetCore.Create();
var appArgs = SharedKNetCore.FilteredArgs;
byte[] bb, bb1;
diff --git a/tests/net/KNetTestStreams/Program.cs b/tests/net/KNetTestStreams/Program.cs
index 281503c0c2..212d5b76fc 100644
--- a/tests/net/KNetTestStreams/Program.cs
+++ b/tests/net/KNetTestStreams/Program.cs
@@ -48,7 +48,7 @@ class Program
static void Main(string[] args)
{
- SharedKNetCore.CreateGlobalInstance();
+ SharedKNetCore.Create();
var appArgs = SharedKNetCore.FilteredArgs;
if (appArgs.Length != 0)