diff --git a/.github/workflows/docker_build_and_test.yml b/.github/workflows/docker_build_and_test.yml
index 695c08672fd87..1d30aa85ea25d 100644
--- a/.github/workflows/docker_build_and_test.yml
+++ b/.github/workflows/docker_build_and_test.yml
@@ -23,6 +23,7 @@ on:
description: Docker image type to build and test
options:
- "jvm"
+ - "native"
kafka_url:
description: Kafka url to be used to build the docker image
required: true
diff --git a/.github/workflows/docker_official_image_build_and_test.yml b/.github/workflows/docker_official_image_build_and_test.yml
new file mode 100644
index 0000000000000..a315cd0e0d2ac
--- /dev/null
+++ b/.github/workflows/docker_official_image_build_and_test.yml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Docker Official Image Build Test
+
+on:
+ workflow_dispatch:
+ inputs:
+ image_type:
+ type: choice
+ description: Docker image type to build and test
+ options:
+ - "jvm"
+ kafka_version:
+ description: Kafka version for the docker official image. This should be >=3.7.0
+ required: true
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up Python 3.10
+ uses: actions/setup-python@v3
+ with:
+ python-version: "3.10"
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install -r docker/requirements.txt
+ - name: Build image and run tests
+ working-directory: ./docker
+ run: |
+ python docker_official_image_build_test.py kafka/test -tag=test -type=${{ github.event.inputs.image_type }} -v=${{ github.event.inputs.kafka_version }}
+ - name: Run CVE scan
+ uses: aquasecurity/trivy-action@master
+ with:
+ image-ref: 'kafka/test:test'
+ format: 'table'
+ severity: 'CRITICAL,HIGH'
+ output: scan_report_${{ github.event.inputs.image_type }}.txt
+ exit-code: '1'
+ - name: Upload test report
+ if: always()
+ uses: actions/upload-artifact@v3
+ with:
+ name: report_${{ github.event.inputs.image_type }}.html
+ path: docker/test/report_${{ github.event.inputs.image_type }}.html
+ - name: Upload CVE scan report
+ if: always()
+ uses: actions/upload-artifact@v3
+ with:
+ name: scan_report_${{ github.event.inputs.image_type }}.txt
+ path: scan_report_${{ github.event.inputs.image_type }}.txt
diff --git a/.github/workflows/docker_promote.yml b/.github/workflows/docker_promote.yml
index 3449265877b8a..04872f9d59d3b 100644
--- a/.github/workflows/docker_promote.yml
+++ b/.github/workflows/docker_promote.yml
@@ -19,10 +19,10 @@ on:
workflow_dispatch:
inputs:
rc_docker_image:
- description: RC docker image that needs to be promoted (Example:- apache/kafka:3.6.0-rc0)
+ description: RC docker image that needs to be promoted (Example:- apache/kafka:3.8.0-rc0 (OR) apache/kafka-native:3.8.0-rc0)
required: true
promoted_docker_image:
- description: Docker image name of the promoted image (Example:- apache/kafka:3.6.0)
+ description: Docker image name of the promoted image (Example:- apache/kafka:3.8.0 (OR) apache/kafka-native:3.8.0)
required: true
jobs:
diff --git a/.github/workflows/docker_rc_release.yml b/.github/workflows/docker_rc_release.yml
index c7082dcac910c..22dd924b51b61 100644
--- a/.github/workflows/docker_rc_release.yml
+++ b/.github/workflows/docker_rc_release.yml
@@ -23,8 +23,9 @@ on:
description: Docker image type to be built and pushed
options:
- "jvm"
+ - "native"
rc_docker_image:
- description: RC docker image that needs to be built and pushed to Dockerhub (Example:- apache/kafka:3.6.0-rc0)
+ description: RC docker image that needs to be built and pushed to Dockerhub (Example:- apache/kafka:3.8.0-rc0 (OR) apache/kafka-native:3.8.0-rc0)
required: true
kafka_url:
description: Kafka url to be used to build the docker image
diff --git a/.github/workflows/prepare_docker_official_image_source.yml b/.github/workflows/prepare_docker_official_image_source.yml
new file mode 100644
index 0000000000000..45491045836f6
--- /dev/null
+++ b/.github/workflows/prepare_docker_official_image_source.yml
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Docker Prepare Docker Official Image Source
+
+on:
+ workflow_dispatch:
+ inputs:
+ image_type:
+ type: choice
+ description: Docker image type to build and test
+ options:
+ - "jvm"
+ kafka_version:
+ description: Kafka version for the docker official image. This should be >=3.7.0
+ required: true
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up Python 3.10
+ uses: actions/setup-python@v3
+ with:
+ python-version: "3.10"
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install -r docker/requirements.txt
+ - name: Build Docker Official Image Artifact
+ working-directory: ./docker
+ run: |
+ python prepare_docker_official_image_source.py -type=${{ github.event.inputs.image_type }} -v=${{ github.event.inputs.kafka_version }}
+ - name: Upload Docker Official Image Artifact
+ if: success()
+ uses: actions/upload-artifact@v4
+ with:
+ name: ${{ github.event.inputs.kafka_version }}
+ path: docker/docker_official_images/${{ github.event.inputs.kafka_version }}
diff --git a/.gitignore b/.gitignore
index 7dfe61c38cccf..015df8ead8327 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,4 +60,5 @@ jmh-benchmarks/src/main/generated
storage/kafka-tiered-storage/
docker/test/report_*.html
+kafka.Kafka
__pycache__
diff --git a/Jenkinsfile b/Jenkinsfile
index 50b7f6a298e71..0a795637ff638 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -36,6 +36,12 @@ def doTest(env, target = "test") {
junit '**/build/test-results/**/TEST-*.xml'
}
+def runTestOnDevBranch(env) {
+ if (!isChangeRequest(env)) {
+ doTest(env)
+ }
+}
+
def doStreamsArchetype() {
echo 'Verify that Kafka Streams archetype compiles'
@@ -132,7 +138,7 @@ pipeline {
}
steps {
doValidation()
- doTest(env)
+ runTestOnDevBranch(env)
echo 'Skipping Kafka Streams archetype test for Java 11'
}
}
@@ -151,7 +157,7 @@ pipeline {
}
steps {
doValidation()
- doTest(env)
+ runTestOnDevBranch(env)
echo 'Skipping Kafka Streams archetype test for Java 17'
}
}
diff --git a/README.md b/README.md
index 27ce0dc0bce64..ab7dcd7685bde 100644
--- a/README.md
+++ b/README.md
@@ -227,11 +227,16 @@ There are two code quality analysis tools that we regularly run, spotbugs and ch
Checkstyle enforces a consistent coding style in Kafka.
You can run checkstyle using:
- ./gradlew checkstyleMain checkstyleTest
+ ./gradlew checkstyleMain checkstyleTest spotlessCheck
The checkstyle warnings will be found in `reports/checkstyle/reports/main.html` and `reports/checkstyle/reports/test.html` files in the
subproject build directories. They are also printed to the console. The build will fail if Checkstyle fails.
+#### Spotless ####
+The import order is a part of static check. please call `spotlessApply` to optimize the imports of Java codes before filing pull request :
+
+ ./gradlew spotlessApply
+
#### Spotbugs ####
Spotbugs uses static analysis to look for bugs in the code.
You can run spotbugs using:
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 4e63ff1fb972a..e1d9e540495d5 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -208,7 +208,7 @@ fi
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
- KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
+ KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
# JMX port to use
@@ -340,9 +340,16 @@ CLASSPATH=${CLASSPATH#:}
# If Cygwin is detected, classpath is converted to Windows format.
(( WINDOWS_OS_FORMAT )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
-# Launch mode
-if [ "x$DAEMON_MODE" = "xtrue" ]; then
- nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
+# If KAFKA_MODE=native, it will bring up Kafka in the native mode.
+# It expects the Kafka executable binary to be present at $base_dir/kafka.Kafka.
+# This is specifically used to run system tests on native Kafka - by bringing up Kafka in the native mode.
+if [[ "x$KAFKA_MODE" == "xnative" ]] && [[ "$*" == *"kafka.Kafka"* ]]; then
+ exec $base_dir/kafka.Kafka start --config "$2" $KAFKA_LOG4J_CMD_OPTS $KAFKA_JMX_OPTS $KAFKA_OPTS
else
- exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
+ # Launch mode
+ if [ "x$DAEMON_MODE" = "xtrue" ]; then
+ nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
+ else
+ exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
+ fi
fi
diff --git a/build.gradle b/build.gradle
index 031389f62c6e9..a2a6531d29a62 100644
--- a/build.gradle
+++ b/build.gradle
@@ -47,7 +47,9 @@ plugins {
// Updating the shadow plugin version to 8.1.1 causes issue with signing and publishing the shadowed
// artifacts - see https://github.com/johnrengelman/shadow/issues/901
id 'com.github.johnrengelman.shadow' version '8.1.0' apply false
- id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer require Java 11 at compile time, so we can't upgrade until AK 4.0
+ // the minimum required JRE of 6.14.0+ is 11
+ // refer:https://github.com/diffplug/spotless/tree/main/plugin-gradle#requirements
+ id 'com.diffplug.spotless' version "6.13.0" apply false
}
ext {
@@ -176,6 +178,8 @@ allprojects {
options.memberLevel = JavadocMemberLevel.PUBLIC // Document only public members/API
// Turn off doclint for now, see https://blog.joda.org/2014/02/turning-off-doclint-in-jdk-8-javadoc.html for rationale
options.addStringOption('Xdoclint:none', '-quiet')
+ // Javadoc warnings should fail the build in JDK 15+ https://bugs.openjdk.org/browse/JDK-8200363
+ options.addBooleanOption('Werror', JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_15))
// The URL structure was changed to include the locale after Java 8
if (JavaVersion.current().isJava11Compatible())
@@ -196,6 +200,9 @@ def determineCommitId() {
}
}
+def spotlessApplyModules = ['']
+
+
apply from: file('wrapper.gradle')
if (repo != null) {
@@ -233,7 +240,8 @@ if (repo != null) {
'**/generated/**',
'clients/src/test/resources/serializedData/*',
'docker/test/fixtures/secrets/*',
- 'docker/examples/fixtures/secrets/*'
+ 'docker/examples/fixtures/secrets/*',
+ 'docker/docker_official_images/.gitkeep'
])
}
} else {
@@ -304,10 +312,12 @@ subprojects {
addParametersForTests(name, options)
}
- // We should only set this if Java version is < 9 (--release is recommended for >= 9), but the Scala plugin for IntelliJ sets
- // `-target` incorrectly if this is unset
- sourceCompatibility = minJavaVersion
- targetCompatibility = minJavaVersion
+ java {
+ // We should only set this if Java version is < 9 (--release is recommended for >= 9), but the Scala plugin for IntelliJ sets
+ // `-target` incorrectly if this is unset
+ sourceCompatibility = minJavaVersion
+ targetCompatibility = minJavaVersion
+ }
if (shouldPublish) {
@@ -356,7 +366,7 @@ subprojects {
artifact task
}
- artifactId = archivesBaseName
+ artifactId = base.archivesName.get()
pom {
name = 'Apache Kafka'
url = 'https://kafka.apache.org'
@@ -609,6 +619,8 @@ subprojects {
task docsJar(dependsOn: javadocJar)
+ test.dependsOn('javadoc')
+
task systemTestLibs(dependsOn: jar)
if (!sourceSets.test.allSource.isEmpty()) {
@@ -741,8 +753,8 @@ subprojects {
}
test.dependsOn('spotbugsMain')
- tasks.withType(com.github.spotbugs.snom.SpotBugsTask) {
- reports {
+ tasks.withType(com.github.spotbugs.snom.SpotBugsTask).configureEach {
+ reports.configure {
// Continue supporting `xmlFindBugsReport` for compatibility
xml.enabled(project.hasProperty('xmlSpotBugsReport') || project.hasProperty('xmlFindBugsReport'))
html.enabled(!project.hasProperty('xmlSpotBugsReport') && !project.hasProperty('xmlFindBugsReport'))
@@ -786,6 +798,16 @@ subprojects {
skipProjects = [ ":jmh-benchmarks", ":trogdor" ]
skipConfigurations = [ "zinc" ]
}
+
+ if (project.name in spotlessApplyModules) {
+ apply plugin: 'com.diffplug.spotless'
+ spotless {
+ java {
+ importOrder('kafka', 'org.apache.kafka', 'com', 'net', 'org', 'java', 'javax', '', '\\#')
+ removeUnusedImports()
+ }
+ }
+ }
}
gradle.taskGraph.whenReady { taskGraph ->
@@ -850,7 +872,9 @@ tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" })
tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + ":test" }) {}
project(':server') {
- archivesBaseName = "kafka-server"
+ base {
+ archivesName = "kafka-server"
+ }
dependencies {
implementation project(':clients')
@@ -922,7 +946,10 @@ project(':core') {
}
if (userEnableTestCoverage)
apply plugin: "org.scoverage"
- archivesBaseName = "kafka_${versions.baseScala}"
+
+ base {
+ archivesName = "kafka_${versions.baseScala}"
+ }
configurations {
generator
@@ -1250,7 +1277,9 @@ project(':core') {
}
project(':metadata') {
- archivesBaseName = "kafka-metadata"
+ base {
+ archivesName = "kafka-metadata"
+ }
configurations {
generator
@@ -1317,7 +1346,9 @@ project(':metadata') {
}
project(':group-coordinator') {
- archivesBaseName = "kafka-group-coordinator"
+ base {
+ archivesName = "kafka-group-coordinator"
+ }
configurations {
generator
@@ -1384,7 +1415,9 @@ project(':group-coordinator') {
}
project(':transaction-coordinator') {
- archivesBaseName = "kafka-transaction-coordinator"
+ base {
+ archivesName = "kafka-transaction-coordinator"
+ }
sourceSets {
main {
@@ -1405,7 +1438,9 @@ project(':transaction-coordinator') {
}
project(':examples') {
- archivesBaseName = "kafka-examples"
+ base {
+ archivesName = "kafka-examples"
+ }
dependencies {
implementation project(':clients')
@@ -1435,7 +1470,9 @@ project(':generator') {
}
project(':clients') {
- archivesBaseName = "kafka-clients"
+ base {
+ archivesName = "kafka-clients"
+ }
configurations {
generator
@@ -1610,7 +1647,9 @@ project(':clients') {
}
project(':raft') {
- archivesBaseName = "kafka-raft"
+ base {
+ archivesName = "kafka-raft"
+ }
configurations {
generator
@@ -1706,7 +1745,9 @@ project(':raft') {
}
project(':server-common') {
- archivesBaseName = "kafka-server-common"
+ base {
+ archivesName = "kafka-server-common"
+ }
dependencies {
api project(':clients')
@@ -1764,7 +1805,9 @@ project(':server-common') {
}
project(':storage:storage-api') {
- archivesBaseName = "kafka-storage-api"
+ base {
+ archivesName = "kafka-storage-api"
+ }
dependencies {
implementation project(':clients')
@@ -1832,7 +1875,9 @@ project(':storage:storage-api') {
}
project(':storage') {
- archivesBaseName = "kafka-storage"
+ base {
+ archivesName = "kafka-storage"
+ }
configurations {
generator
@@ -1954,7 +1999,9 @@ project(':storage') {
}
project(':tools:tools-api') {
- archivesBaseName = "kafka-tools-api"
+ base {
+ archivesName = "kafka-tools-api"
+ }
dependencies {
implementation project(':clients')
@@ -2009,7 +2056,10 @@ project(':tools:tools-api') {
}
project(':tools') {
- archivesBaseName = "kafka-tools"
+ base {
+ archivesName = "kafka-tools"
+ }
+
dependencies {
implementation project(':clients')
implementation project(':storage')
@@ -2073,7 +2123,9 @@ project(':tools') {
}
project(':trogdor') {
- archivesBaseName = "trogdor"
+ base {
+ archivesName = "trogdor"
+ }
dependencies {
implementation project(':clients')
@@ -2123,7 +2175,9 @@ project(':trogdor') {
}
project(':shell') {
- archivesBaseName = "kafka-shell"
+ base {
+ archivesName = "kafka-shell"
+ }
dependencies {
implementation libs.argparse4j
@@ -2173,7 +2227,10 @@ project(':shell') {
}
project(':streams') {
- archivesBaseName = "kafka-streams"
+ base {
+ archivesName = "kafka-streams"
+ }
+
ext.buildStreamsVersionFileName = "kafka-streams-version.properties"
configurations {
@@ -2335,7 +2392,11 @@ project(':streams') {
project(':streams:streams-scala') {
apply plugin: 'scala'
- archivesBaseName = "kafka-streams-scala_${versions.baseScala}"
+
+ base {
+ archivesName = "kafka-streams-scala_${versions.baseScala}"
+ }
+
dependencies {
api project(':streams')
@@ -2397,7 +2458,9 @@ project(':streams:streams-scala') {
}
project(':streams:test-utils') {
- archivesBaseName = "kafka-streams-test-utils"
+ base {
+ archivesName = "kafka-streams-test-utils"
+ }
dependencies {
api project(':streams')
@@ -2432,7 +2495,9 @@ project(':streams:test-utils') {
}
project(':streams:examples') {
- archivesBaseName = "kafka-streams-examples"
+ base {
+ archivesName = "kafka-streams-examples"
+ }
dependencies {
// this dependency should be removed after we unify data API
@@ -2469,7 +2534,9 @@ project(':streams:examples') {
}
project(':streams:upgrade-system-tests-0100') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-0100"
+ }
dependencies {
testImplementation(libs.kafkaStreams_0100) {
@@ -2485,7 +2552,9 @@ project(':streams:upgrade-system-tests-0100') {
}
project(':streams:upgrade-system-tests-0101') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-0101"
+ }
dependencies {
testImplementation(libs.kafkaStreams_0101) {
@@ -2501,7 +2570,9 @@ project(':streams:upgrade-system-tests-0101') {
}
project(':streams:upgrade-system-tests-0102') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-0102"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-0102"
+ }
dependencies {
testImplementation libs.kafkaStreams_0102
@@ -2514,7 +2585,9 @@ project(':streams:upgrade-system-tests-0102') {
}
project(':streams:upgrade-system-tests-0110') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-0110"
+ base{
+ archivesName = "kafka-streams-upgrade-system-tests-0110"
+ }
dependencies {
testImplementation libs.kafkaStreams_0110
@@ -2527,7 +2600,9 @@ project(':streams:upgrade-system-tests-0110') {
}
project(':streams:upgrade-system-tests-10') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-10"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-10"
+ }
dependencies {
testImplementation libs.kafkaStreams_10
@@ -2540,7 +2615,9 @@ project(':streams:upgrade-system-tests-10') {
}
project(':streams:upgrade-system-tests-11') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-11"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-11"
+ }
dependencies {
testImplementation libs.kafkaStreams_11
@@ -2553,7 +2630,9 @@ project(':streams:upgrade-system-tests-11') {
}
project(':streams:upgrade-system-tests-20') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-20"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-20"
+ }
dependencies {
testImplementation libs.kafkaStreams_20
@@ -2566,7 +2645,9 @@ project(':streams:upgrade-system-tests-20') {
}
project(':streams:upgrade-system-tests-21') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-21"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-21"
+ }
dependencies {
testImplementation libs.kafkaStreams_21
@@ -2579,7 +2660,9 @@ project(':streams:upgrade-system-tests-21') {
}
project(':streams:upgrade-system-tests-22') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-22"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-22"
+ }
dependencies {
testImplementation libs.kafkaStreams_22
@@ -2592,7 +2675,9 @@ project(':streams:upgrade-system-tests-22') {
}
project(':streams:upgrade-system-tests-23') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-23"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-23"
+ }
dependencies {
testImplementation libs.kafkaStreams_23
@@ -2605,7 +2690,9 @@ project(':streams:upgrade-system-tests-23') {
}
project(':streams:upgrade-system-tests-24') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-24"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-24"
+ }
dependencies {
testImplementation libs.kafkaStreams_24
@@ -2618,7 +2705,9 @@ project(':streams:upgrade-system-tests-24') {
}
project(':streams:upgrade-system-tests-25') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-25"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-25"
+ }
dependencies {
testImplementation libs.kafkaStreams_25
@@ -2631,7 +2720,9 @@ project(':streams:upgrade-system-tests-25') {
}
project(':streams:upgrade-system-tests-26') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-26"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-26"
+ }
dependencies {
testImplementation libs.kafkaStreams_26
@@ -2644,7 +2735,9 @@ project(':streams:upgrade-system-tests-26') {
}
project(':streams:upgrade-system-tests-27') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-27"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-27"
+ }
dependencies {
testImplementation libs.kafkaStreams_27
@@ -2657,7 +2750,9 @@ project(':streams:upgrade-system-tests-27') {
}
project(':streams:upgrade-system-tests-28') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-28"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-28"
+ }
dependencies {
testImplementation libs.kafkaStreams_28
@@ -2670,7 +2765,9 @@ project(':streams:upgrade-system-tests-28') {
}
project(':streams:upgrade-system-tests-30') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-30"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-30"
+ }
dependencies {
testImplementation libs.kafkaStreams_30
@@ -2683,7 +2780,9 @@ project(':streams:upgrade-system-tests-30') {
}
project(':streams:upgrade-system-tests-31') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-31"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-31"
+ }
dependencies {
testImplementation libs.kafkaStreams_31
@@ -2696,7 +2795,9 @@ project(':streams:upgrade-system-tests-31') {
}
project(':streams:upgrade-system-tests-32') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-32"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-32"
+ }
dependencies {
testImplementation libs.kafkaStreams_32
@@ -2709,7 +2810,9 @@ project(':streams:upgrade-system-tests-32') {
}
project(':streams:upgrade-system-tests-33') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-33"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-33"
+ }
dependencies {
testImplementation libs.kafkaStreams_33
@@ -2722,7 +2825,9 @@ project(':streams:upgrade-system-tests-33') {
}
project(':streams:upgrade-system-tests-34') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-34"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-34"
+ }
dependencies {
testImplementation libs.kafkaStreams_34
@@ -2735,7 +2840,9 @@ project(':streams:upgrade-system-tests-34') {
}
project(':streams:upgrade-system-tests-35') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-35"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-35"
+ }
dependencies {
testImplementation libs.kafkaStreams_35
@@ -2748,7 +2855,9 @@ project(':streams:upgrade-system-tests-35') {
}
project(':streams:upgrade-system-tests-36') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-36"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-36"
+ }
dependencies {
testImplementation libs.kafkaStreams_36
@@ -2761,7 +2870,9 @@ project(':streams:upgrade-system-tests-36') {
}
project(':streams:upgrade-system-tests-37') {
- archivesBaseName = "kafka-streams-upgrade-system-tests-37"
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-37"
+ }
dependencies {
testImplementation libs.kafkaStreams_37
@@ -2846,7 +2957,9 @@ project(':jmh-benchmarks') {
}
project(':log4j-appender') {
- archivesBaseName = "kafka-log4j-appender"
+ base {
+ archivesName = "kafka-log4j-appender"
+ }
dependencies {
implementation project(':clients')
@@ -2865,7 +2978,9 @@ project(':log4j-appender') {
}
project(':connect:api') {
- archivesBaseName = "connect-api"
+ base {
+ archivesName = "connect-api"
+ }
dependencies {
api project(':clients')
@@ -2900,7 +3015,9 @@ project(':connect:api') {
}
project(':connect:transforms') {
- archivesBaseName = "connect-transforms"
+ base {
+ archivesName = "connect-transforms"
+ }
dependencies {
api project(':connect:api')
@@ -2936,7 +3053,9 @@ project(':connect:transforms') {
}
project(':connect:json') {
- archivesBaseName = "connect-json"
+ base {
+ archivesName = "connect-json"
+ }
dependencies {
api project(':connect:api')
@@ -2980,7 +3099,9 @@ project(':connect:runtime') {
swagger
}
- archivesBaseName = "connect-runtime"
+ base {
+ archivesName = "connect-runtime"
+ }
dependencies {
// connect-runtime is used in tests, use `api` for modules below for backwards compatibility even though
@@ -3122,7 +3243,9 @@ project(':connect:runtime') {
}
project(':connect:file') {
- archivesBaseName = "connect-file"
+ base {
+ archivesName = "connect-file"
+ }
dependencies {
implementation project(':connect:api')
@@ -3162,7 +3285,9 @@ project(':connect:file') {
}
project(':connect:basic-auth-extension') {
- archivesBaseName = "connect-basic-auth-extension"
+ base {
+ archivesName = "connect-basic-auth-extension"
+ }
dependencies {
implementation project(':connect:api')
@@ -3202,7 +3327,9 @@ project(':connect:basic-auth-extension') {
}
project(':connect:mirror') {
- archivesBaseName = "connect-mirror"
+ base {
+ archivesName = "connect-mirror"
+ }
dependencies {
implementation project(':connect:api')
@@ -3290,7 +3417,9 @@ project(':connect:mirror') {
}
project(':connect:mirror-client') {
- archivesBaseName = "connect-mirror-client"
+ base {
+ archivesName = "connect-mirror-client"
+ }
dependencies {
implementation project(':clients')
@@ -3325,7 +3454,9 @@ project(':connect:mirror-client') {
}
project(':connect:test-plugins') {
- archivesBaseName = "connect-test-plugins"
+ base {
+ archivesName = "connect-test-plugins"
+ }
dependencies {
api project(':connect:api')
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index aff659638928b..61eb7e4b245fd 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -82,6 +82,8 @@
+
+
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index ed6c53a322ba3..6724ea9bf3ba0 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -130,5 +130,6 @@
+
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ed0f34e4fffad..ab6177961f51e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -211,6 +211,11 @@
+
+
+
+
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a8bc6351db3fa..fc6995dadfe7e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -43,6 +43,8 @@
+
@@ -99,7 +101,7 @@
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
+ files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
@@ -143,7 +145,7 @@
+ files="(DistributedHerder|DistributedConfig|KafkaConfigBackingStore|IncrementalCooperativeAssignor).java"/>
+ files="(JsonConverter|ConnectHeaders).java"/>
+ files="(KafkaConfigBackingStore|ConnectMetricsRegistry).java"/>
@@ -347,7 +349,7 @@
+ files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
offsetsForTimes(Map entry.getValue().buildOffsetAndTimestamp()));
+ Map.Entry::getKey,
+ entry -> entry.getValue().buildOffsetAndTimestamp()));
+ } catch (TimeoutException e) {
+ throw new TimeoutException("Failed to get offsets by times in " + timeout.toMillis() + "ms");
+ }
} finally {
release();
}
@@ -1136,12 +1140,16 @@ private Map beginningOrEndOffset(Collection offsetAndTimestampMap;
- offsetAndTimestampMap = applicationEventHandler.addAndGet(listOffsetsEvent);
- return offsetAndTimestampMap.entrySet()
+ try {
+ offsetAndTimestampMap = applicationEventHandler.addAndGet(listOffsetsEvent);
+ return offsetAndTimestampMap.entrySet()
.stream()
.collect(Collectors.toMap(
- Map.Entry::getKey,
- entry -> entry.getValue().offset()));
+ Map.Entry::getKey,
+ entry -> entry.getValue().offset()));
+ } catch (TimeoutException e) {
+ throw new TimeoutException("Failed to get offsets by times in " + timeout.toMillis() + "ms");
+ }
} finally {
release();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 8959345bffdcf..5244af9c827d5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -22,6 +22,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.RecordDeserializationException.DeserializationExceptionOrigin;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -311,25 +312,39 @@ ConsumerRecord parseRecord(Deserializers deserializers,
Optional leaderEpoch,
TimestampType timestampType,
Record record) {
+ ByteBuffer keyBytes = record.key();
+ ByteBuffer valueBytes = record.value();
+ Headers headers = new RecordHeaders(record.headers());
+ K key;
+ V value;
try {
- long offset = record.offset();
- long timestamp = record.timestamp();
- Headers headers = new RecordHeaders(record.headers());
- ByteBuffer keyBytes = record.key();
- K key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
- ByteBuffer valueBytes = record.value();
- V value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
- return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
- timestamp, timestampType,
- keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
- valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
- key, value, headers, leaderEpoch);
+ key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
} catch (RuntimeException e) {
- log.error("Deserializers with error: {}", deserializers);
- throw new RecordDeserializationException(partition, record.offset(),
- "Error deserializing key/value for partition " + partition +
- " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
+ log.error("Key Deserializers with error: {}", deserializers);
+ throw newRecordDeserializationException(DeserializationExceptionOrigin.KEY, partition, timestampType, record, e, headers);
}
+ try {
+ value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
+ } catch (RuntimeException e) {
+ log.error("Value Deserializers with error: {}", deserializers);
+ throw newRecordDeserializationException(DeserializationExceptionOrigin.VALUE, partition, timestampType, record, e, headers);
+ }
+ return new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(),
+ record.timestamp(), timestampType,
+ keyBytes == null ? ConsumerRecord.NULL_SIZE : keyBytes.remaining(),
+ valueBytes == null ? ConsumerRecord.NULL_SIZE : valueBytes.remaining(),
+ key, value, headers, leaderEpoch);
+ }
+
+ private static RecordDeserializationException newRecordDeserializationException(DeserializationExceptionOrigin origin,
+ TopicPartition partition,
+ TimestampType timestampType,
+ Record record,
+ RuntimeException e,
+ Headers headers) {
+ return new RecordDeserializationException(origin, partition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), headers,
+ "Error deserializing " + origin.name() + " for partition " + partition + " at offset " + record.offset()
+ + ". If needed, please seek past the record to continue consumption.", e);
}
private Optional maybeLeaderEpoch(int leaderEpoch) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index f65a0642a4300..d31d412c65503 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -45,6 +45,7 @@
import java.util.TreeSet;
import java.util.stream.Collectors;
+
/**
*
Manages the request creation and response handling for the heartbeat. The module creates a
* {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to
@@ -208,7 +209,11 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(leaveHeartbeat));
}
- boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight();
+ // Case 1: The member is leaving
+ boolean heartbeatNow = membershipManager.state() == MemberState.LEAVING ||
+ // Case 2: The member state indicates it should send a heartbeat without waiting for the interval, and there is no heartbeat request currently in-flight
+ (membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight());
+
if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
}
@@ -383,7 +388,15 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
case UNRELEASED_INSTANCE_ID:
logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}",
membershipManager.groupInstanceId().orElse("null"), errorMessage);
- handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
+ handleFatalFailure(error.exception(errorMessage));
+ break;
+
+ case FENCED_INSTANCE_ID:
+ logger.error("GroupHeartbeatRequest failed due to fenced instance id {}: {}. " +
+ "This is expected in the case that the member was removed from the group " +
+ "by an admin client, and another member joined using the same group instance id.",
+ membershipManager.groupInstanceId().orElse("null"), errorMessage);
+ handleFatalFailure(error.exception(errorMessage));
break;
case INVALID_REQUEST:
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 84b77ef5f40db..820adbdb5fbfa 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -385,12 +385,13 @@ public boolean equals(Object o) {
Objects.equals(internalTopics, cluster.internalTopics) &&
Objects.equals(controller, cluster.controller) &&
Objects.equals(partitionsByTopicPartition, cluster.partitionsByTopicPartition) &&
- Objects.equals(clusterResource, cluster.clusterResource);
+ Objects.equals(clusterResource, cluster.clusterResource) &&
+ Objects.equals(topicIds, cluster.topicIds);
}
@Override
public int hashCode() {
return Objects.hash(isBootstrapConfigured, nodes, unauthorizedTopics, invalidTopics, internalTopics, controller,
- partitionsByTopicPartition, clusterResource);
+ partitionsByTopicPartition, clusterResource, topicIds);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index 29bedff0cf19b..0577b4200a3bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.common;
+import java.util.Arrays;
+import java.util.Objects;
+
/**
* This is used to describe per-partition state in the MetadataResponse.
*/
@@ -88,6 +91,29 @@ public Node[] offlineReplicas() {
return offlineReplicas;
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, partition, leader, Arrays.hashCode(replicas),
+ Arrays.hashCode(inSyncReplicas), Arrays.hashCode(offlineReplicas));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PartitionInfo other = (PartitionInfo) obj;
+ return Objects.equals(topic, other.topic) &&
+ partition == other.partition &&
+ Objects.equals(leader, other.leader) &&
+ Objects.deepEquals(replicas, other.replicas) &&
+ Objects.deepEquals(inSyncReplicas, other.inSyncReplicas) &&
+ Objects.deepEquals(offlineReplicas, other.offlineReplicas);
+ }
+
@Override
public String toString() {
return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s, offlineReplicas = %s)",
diff --git a/clients/src/main/java/org/apache/kafka/common/ShareGroupState.java b/clients/src/main/java/org/apache/kafka/common/ShareGroupState.java
new file mode 100644
index 0000000000000..716421f3dea2a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/ShareGroupState.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * The share group state.
+ */
+public enum ShareGroupState {
+ UNKNOWN("Unknown"),
+ STABLE("Stable"),
+ DEAD("Dead"),
+ EMPTY("Empty");
+
+ private final static Map NAME_TO_ENUM = Arrays.stream(values())
+ .collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity()));
+
+ private final String name;
+
+ ShareGroupState(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Case-insensitive share group state lookup by string name.
+ */
+ public static ShareGroupState parse(String name) {
+ ShareGroupState state = NAME_TO_ENUM.get(name.toUpperCase(Locale.ROOT));
+ return state == null ? UNKNOWN : state;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 791a4602cf0f1..767fa9aca015f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -17,6 +17,9 @@
package org.apache.kafka.common.config.internals;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import java.util.Collections;
import java.util.List;
@@ -30,39 +33,19 @@
public class BrokerSecurityConfigs {
public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
- public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG = "sasl.kerberos.principal.to.local.rules";
- public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
- public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms";
- public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class";
- public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules";
- public static final String CONNECTIONS_MAX_REAUTH_MS = "connections.max.reauth.ms";
- public static final long DEFAULT_CONNECTIONS_MAX_REAUTH_MS = 0L;
- public static final int DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE = 524288;
- public static final String SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG = "sasl.server.max.receive.size";
- public static final String SSL_ALLOW_DN_CHANGES_CONFIG = "ssl.allow.dn.changes";
- public static final boolean DEFAULT_SSL_ALLOW_DN_CHANGES_VALUE = false;
- public static final String SSL_ALLOW_SAN_CHANGES_CONFIG = "ssl.allow.san.changes";
- public static final boolean DEFAULT_SSL_ALLOW_SAN_CHANGES_VALUE = false;
-
- public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " +
- "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " +
- "authorization. If no principal builder is defined, the default behavior depends " +
- "on the security protocol in use. For SSL authentication, the principal will be derived using the " +
- "rules defined by " + SSL_PRINCIPAL_MAPPING_RULES_CONFIG + " applied on the distinguished " +
- "name from the client certificate if one is provided; otherwise, if client authentication is not required, " +
- "the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the " +
- "rules defined by " + SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + " if GSSAPI is in use, " +
- "and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS.";
+ public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules";
+ public static final String DEFAULT_SSL_PRINCIPAL_MAPPING_RULES = "DEFAULT";
public static final String SSL_PRINCIPAL_MAPPING_RULES_DOC = "A list of rules for mapping from distinguished name" +
" from the client certificate to short name. The rules are evaluated in order and the first rule that matches" +
" a principal name is used to map it to a short name. Any later rules in the list are ignored. By default," +
" distinguished name of the X.500 certificate will be the principal. For more details on the format please" +
" see security authorization and acls. Note that this configuration is ignored" +
" if an extension of KafkaPrincipalBuilder is provided by the " + PRINCIPAL_BUILDER_CLASS_CONFIG + "" +
- " configuration.";
- public static final String DEFAULT_SSL_PRINCIPAL_MAPPING_RULES = "DEFAULT";
+ " configuration.";
+ public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG = "sasl.kerberos.principal.to.local.rules";
+ public static final List DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList(DEFAULT_SSL_PRINCIPAL_MAPPING_RULES);
public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal " +
"names to short names (typically operating system usernames). The rules are evaluated in order and the " +
"first rule that matches a principal name is used to map it to a short name. Any later rules in the list are " +
@@ -70,8 +53,20 @@ public class BrokerSecurityConfigs {
"to {username}. For more details on the format please see " +
"security authorization and acls. Note that this configuration is ignored if an extension of " +
"KafkaPrincipalBuilder is provided by the " + PRINCIPAL_BUILDER_CLASS_CONFIG + " configuration.";
- public static final List DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");
+ public static final Class extends KafkaPrincipalBuilder> PRINCIPAL_BUILDER_CLASS_DEFAULT = DefaultKafkaPrincipalBuilder.class;
+ public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " +
+ "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " +
+ "authorization. If no principal builder is defined, the default behavior depends " +
+ "on the security protocol in use. For SSL authentication, the principal will be derived using the " +
+ "rules defined by " + SSL_PRINCIPAL_MAPPING_RULES_CONFIG + " applied on the distinguished " +
+ "name from the client certificate if one is provided; otherwise, if client authentication is not required, " +
+ "the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the " +
+ "rules defined by " + SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + " if GSSAPI is in use, " +
+ "and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS.";
+
+ public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
+ public static final String SSL_CLIENT_AUTH_DEFAULT = SslClientAuth.NONE.toString();
public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication."
+ " The following settings are common: "
+ "
"
@@ -81,29 +76,39 @@ public class BrokerSecurityConfigs {
+ "
ssl.client.auth=none This means client authentication is not needed."
+ "
";
+ public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms";
+ public static final List DEFAULT_SASL_ENABLED_MECHANISMS = Collections.singletonList(SaslConfigs.GSSAPI_MECHANISM);
public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL mechanisms enabled in the Kafka server. "
+ "The list may contain any mechanism for which a security provider is available. "
+ "Only GSSAPI is enabled by default.";
- public static final List DEFAULT_SASL_ENABLED_MECHANISMS = Collections.singletonList(SaslConfigs.GSSAPI_MECHANISM);
+ public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG = "sasl.server.callback.handler.class";
public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS_DOC = "The fully qualified name of a SASL server callback handler "
+ "class that implements the AuthenticateCallbackHandler interface. Server callback handlers must be prefixed with "
+ "listener prefix and SASL mechanism name in lower-case. For example, "
+ "listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler.";
+ public static final String CONNECTIONS_MAX_REAUTH_MS_CONFIG = "connections.max.reauth.ms";
+ public static final long DEFAULT_CONNECTIONS_MAX_REAUTH_MS = 0L;
public static final String CONNECTIONS_MAX_REAUTH_MS_DOC = "When explicitly set to a positive number (the default is 0, not a positive number), "
+ "a session lifetime that will not exceed the configured value will be communicated to v2.2.0 or later clients when they authenticate. "
+ "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently "
+ "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL "
+ "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
+ public static final String SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG = "sasl.server.max.receive.size";
+ public static final int DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE = 524288;
public static final String SASL_SERVER_MAX_RECEIVE_SIZE_DOC = "The maximum receive size allowed before and during initial SASL authentication." +
" Default receive size is 512KB. GSSAPI limits requests to 64K, but we allow upto 512KB by default for custom SASL mechanisms. In practice," +
" PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits.";
+ public static final String SSL_ALLOW_DN_CHANGES_CONFIG = "ssl.allow.dn.changes";
+ public static final boolean DEFAULT_SSL_ALLOW_DN_CHANGES_VALUE = false;
public static final String SSL_ALLOW_DN_CHANGES_DOC = "Indicates whether changes to the certificate distinguished name should be allowed during" +
" a dynamic reconfiguration of certificates or not.";
+ public static final String SSL_ALLOW_SAN_CHANGES_CONFIG = "ssl.allow.san.changes";
+ public static final boolean DEFAULT_SSL_ALLOW_SAN_CHANGES_VALUE = false;
public static final String SSL_ALLOW_SAN_CHANGES_DOC = "Indicates whether changes to the certificate subject alternative names should be allowed during " +
"a dynamic reconfiguration of certificates or not.";
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/FencedStateEpochException.java b/clients/src/main/java/org/apache/kafka/common/errors/FencedStateEpochException.java
new file mode 100644
index 0000000000000..1e74bba199402
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/FencedStateEpochException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when the share coordinator rejected the request because the share-group state epoch did not match.
+ */
+public class FencedStateEpochException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public FencedStateEpochException(String message) {
+ super(message);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRecordStateException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRecordStateException.java
new file mode 100644
index 0000000000000..ae0fef5edeaef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRecordStateException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when the acknowledgement of delivery of a record could not be completed because the record
+ * state is invalid.
+ */
+public class InvalidRecordStateException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InvalidRecordStateException(String message) {
+ super(message);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidShareSessionEpochException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidShareSessionEpochException.java
new file mode 100644
index 0000000000000..e261d8b7a8e88
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidShareSessionEpochException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when the share session epoch is invalid.
+ */
+public class InvalidShareSessionEpochException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidShareSessionEpochException(String message) {
+ super(message);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java
index a15df6c7ff52c..aee57c47d28de 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordDeserializationException.java
@@ -16,7 +16,12 @@
*/
package org.apache.kafka.common.errors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.TimestampType;
+
+import java.nio.ByteBuffer;
/**
* This exception is raised for any error that occurs while deserializing records received by the consumer using
@@ -24,14 +29,61 @@
*/
public class RecordDeserializationException extends SerializationException {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+
+ public enum DeserializationExceptionOrigin {
+ KEY,
+ VALUE
+ }
+
+ private final DeserializationExceptionOrigin origin;
private final TopicPartition partition;
private final long offset;
+ private final TimestampType timestampType;
+ private final long timestamp;
+ private final ByteBuffer keyBuffer;
+ private final ByteBuffer valueBuffer;
+ private final Headers headers;
- public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) {
+ @Deprecated
+ public RecordDeserializationException(TopicPartition partition,
+ long offset,
+ String message,
+ Throwable cause) {
super(message, cause);
+ this.origin = null;
this.partition = partition;
this.offset = offset;
+ this.timestampType = TimestampType.NO_TIMESTAMP_TYPE;
+ this.timestamp = ConsumerRecord.NO_TIMESTAMP;
+ this.keyBuffer = null;
+ this.valueBuffer = null;
+ this.headers = null;
+ }
+
+ public RecordDeserializationException(DeserializationExceptionOrigin origin,
+ TopicPartition partition,
+ long offset,
+ long timestamp,
+ TimestampType timestampType,
+ ByteBuffer keyBuffer,
+ ByteBuffer valueBuffer,
+ Headers headers,
+ String message,
+ Throwable cause) {
+ super(message, cause);
+ this.origin = origin;
+ this.offset = offset;
+ this.timestampType = timestampType;
+ this.timestamp = timestamp;
+ this.partition = partition;
+ this.keyBuffer = keyBuffer;
+ this.valueBuffer = valueBuffer;
+ this.headers = headers;
+ }
+
+ public DeserializationExceptionOrigin origin() {
+ return origin;
}
public TopicPartition topicPartition() {
@@ -41,4 +93,24 @@ public TopicPartition topicPartition() {
public long offset() {
return offset;
}
+
+ public TimestampType timestampType() {
+ return timestampType;
+ }
+
+ public long timestamp() {
+ return timestamp;
+ }
+
+ public ByteBuffer keyBuffer() {
+ return keyBuffer;
+ }
+
+ public ByteBuffer valueBuffer() {
+ return valueBuffer;
+ }
+
+ public Headers headers() {
+ return headers;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ShareSessionNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/ShareSessionNotFoundException.java
new file mode 100644
index 0000000000000..2b2249f8a5831
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ShareSessionNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when the share session was not found.
+ */
+public class ShareSessionNotFoundException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public ShareSessionNotFoundException(String message) {
+ super(message);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java b/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
index 573e2c47a082c..8c31e64cc5140 100644
--- a/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
@@ -26,7 +26,7 @@
/**
* Represents an immutable basic version range using 2 attributes: min and max, each of type short.
* The min and max attributes need to satisfy 2 rules:
- * - they are each expected to be >= 0, as we only consider positive version values to be valid.
+ * - they are each expected to be >= 0, as we only consider non-negative version values to be valid.
* - max should be >= min.
*
* The class also provides API to convert the version range to a map.
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
index 7367e966c014e..a77cc9309bd20 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
@@ -25,6 +25,8 @@
*/
public class MetricConfig {
+ public static final int DEFAULT_NUM_SAMPLES = 2;
+
private Quota quota;
private int samples;
private long eventWindow;
@@ -34,7 +36,7 @@ public class MetricConfig {
public MetricConfig() {
this.quota = null;
- this.samples = 2;
+ this.samples = DEFAULT_NUM_SAMPLES;
this.eventWindow = Long.MAX_VALUE;
this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);
this.tags = new LinkedHashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index 09b7c05c8f283..4f15bb9607e1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -74,15 +74,18 @@ public long windowSize(MetricConfig config, long now) {
/*
* Here we check the total amount of time elapsed since the oldest non-obsolete window.
* This give the total windowSize of the batch which is the time used for Rate computation.
- * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second
+ * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30-second
* window, the measured rate will be very high.
- * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
+ * Hence, we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete.
*
* Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time,
* but this approach does not account for sleeps. SampledStat only creates samples whenever record is called,
* if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results.
+ *
+ * Note also, that totalElapsedTimeMs can be larger than the monitored window size,
+ * if the oldest sample started before the window while overlapping it.
*/
- long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
+ long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs;
// Check how many full windows of data we have currently retained
int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
int minFullWindows = config.samples() - 1;
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
index df4a95ccaa9b6..f76fccc853bfa 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
@@ -40,7 +40,8 @@ public abstract class SampledStat implements MeasurableStat {
public SampledStat(double initialValue) {
this.initialValue = initialValue;
- this.samples = new ArrayList<>(2);
+ // keep one extra placeholder for "overlapping sample" (see purgeObsoleteSamples() logic)
+ this.samples = new ArrayList<>(MetricConfig.DEFAULT_NUM_SAMPLES + 1);
}
@Override
@@ -50,10 +51,13 @@ public void record(MetricConfig config, double value, long timeMs) {
sample = advance(config, timeMs);
update(sample, config, value, timeMs);
sample.eventCount += 1;
+ sample.lastEventMs = timeMs;
}
private Sample advance(MetricConfig config, long timeMs) {
- this.current = (this.current + 1) % config.samples();
+ // keep one extra placeholder for "overlapping sample" (see purgeObsoleteSamples() logic)
+ int maxSamples = config.samples() + 1;
+ this.current = (this.current + 1) % maxSamples;
if (this.current >= samples.size()) {
Sample sample = newSample(timeMs);
this.samples.add(sample);
@@ -87,7 +91,7 @@ public Sample oldest(long now) {
Sample oldest = this.samples.get(0);
for (int i = 1; i < this.samples.size(); i++) {
Sample curr = this.samples.get(i);
- if (curr.lastWindowMs < oldest.lastWindowMs)
+ if (curr.startTimeMs < oldest.startTimeMs)
oldest = curr;
}
return oldest;
@@ -106,36 +110,42 @@ public String toString() {
public abstract double combine(List samples, MetricConfig config, long now);
- /* Timeout any windows that have expired in the absence of any events */
+ // purge any samples that lack observed events within the monitored window
protected void purgeObsoleteSamples(MetricConfig config, long now) {
long expireAge = config.samples() * config.timeWindowMs();
for (Sample sample : samples) {
- if (now - sample.lastWindowMs >= expireAge)
+ // samples overlapping the monitored window are kept,
+ // even if they started before it
+ if (now - sample.lastEventMs >= expireAge) {
sample.reset(now);
+ }
}
}
protected static class Sample {
public double initialValue;
public long eventCount;
- public long lastWindowMs;
+ public long startTimeMs;
+ public long lastEventMs;
public double value;
public Sample(double initialValue, long now) {
this.initialValue = initialValue;
this.eventCount = 0;
- this.lastWindowMs = now;
+ this.startTimeMs = now;
+ this.lastEventMs = now;
this.value = initialValue;
}
public void reset(long now) {
this.eventCount = 0;
- this.lastWindowMs = now;
+ this.startTimeMs = now;
+ this.lastEventMs = now;
this.value = initialValue;
}
public boolean isComplete(long timeMs, MetricConfig config) {
- return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
+ return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
}
@Override
@@ -143,7 +153,8 @@ public String toString() {
return "Sample(" +
"value=" + value +
", eventCount=" + eventCount +
- ", lastWindowMs=" + lastWindowMs +
+ ", startTimeMs=" + startTimeMs +
+ ", lastEventMs=" + lastEventMs +
", initialValue=" + initialValue +
')';
}
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java
index 931bd9c35e51f..a632f0254d6b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SimpleRate.java
@@ -33,7 +33,7 @@ public class SimpleRate extends Rate {
@Override
public long windowSize(MetricConfig config, long now) {
stat.purgeObsoleteSamples(config, now);
- long elapsed = now - stat.oldest(now).lastWindowMs;
+ long elapsed = now - stat.oldest(now).startTimeMs;
return Math.max(elapsed, config.timeWindowMs());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index b1257c33c4615..1f91fb8d90ec0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -315,7 +315,7 @@ private void createServerCallbackHandlers(Map configs) {
String prefix = ListenerName.saslMechanismPrefix(mechanism);
@SuppressWarnings("unchecked")
Class extends AuthenticateCallbackHandler> clazz =
- (Class extends AuthenticateCallbackHandler>) configs.get(prefix + BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS);
+ (Class extends AuthenticateCallbackHandler>) configs.get(prefix + BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG);
if (clazz != null)
callbackHandler = Utils.newInstance(clazz);
else if (mechanism.equals(PlainSaslServer.PLAIN_MECHANISM))
@@ -333,9 +333,9 @@ else if (mechanism.equals(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM))
private void createConnectionsMaxReauthMsMap(Map configs) {
for (String mechanism : jaasContexts.keySet()) {
String prefix = ListenerName.saslMechanismPrefix(mechanism);
- Long connectionsMaxReauthMs = (Long) configs.get(prefix + BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS);
+ Long connectionsMaxReauthMs = (Long) configs.get(prefix + BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG);
if (connectionsMaxReauthMs == null)
- connectionsMaxReauthMs = (Long) configs.get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS);
+ connectionsMaxReauthMs = (Long) configs.get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG);
if (connectionsMaxReauthMs != null)
connectionsMaxReauthMsByMechanism.put(mechanism, connectionsMaxReauthMs);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 16bec4fb72dc6..ffd5737ca3162 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -118,7 +118,11 @@ public enum ApiKeys {
PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS),
LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES),
- DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS);
+ DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS),
+ SHARE_GROUP_HEARTBEAT(ApiMessageType.SHARE_GROUP_HEARTBEAT),
+ SHARE_GROUP_DESCRIBE(ApiMessageType.SHARE_GROUP_DESCRIBE),
+ SHARE_FETCH(ApiMessageType.SHARE_FETCH),
+ SHARE_ACKNOWLEDGE(ApiMessageType.SHARE_ACKNOWLEDGE);
private static final Map> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 900d191c8f9d4..10ae05aa850c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -41,6 +41,7 @@
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.FetchSessionTopicIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -64,12 +65,14 @@
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRegistrationException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
+import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
@@ -109,6 +112,7 @@
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
+import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -394,7 +398,11 @@ public enum Errors {
UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID.", UnknownSubscriptionIdException::new),
TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than the maximum size the broker will accept.", TelemetryTooLargeException::new),
INVALID_REGISTRATION(119, "The controller has considered the broker registration to be invalid.", InvalidRegistrationException::new),
- TRANSACTION_ABORTABLE(120, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID.", TransactionAbortableException::new);
+ TRANSACTION_ABORTABLE(120, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID.", TransactionAbortableException::new),
+ INVALID_RECORD_STATE(121, "The record state is invalid. The acknowledgement of delivery could not be completed.", InvalidRecordStateException::new),
+ SHARE_SESSION_NOT_FOUND(122, "The share session was not found.", ShareSessionNotFoundException::new),
+ INVALID_SHARE_SESSION_EPOCH(123, "The share session epoch is invalid.", InvalidShareSessionEpochException::new),
+ FENCED_STATE_EPOCH(124, "The share coordinator rejected the request because the share-group state epoch did not match.", FencedStateEpochException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index b51221f5af642..589e163992b22 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -326,6 +326,14 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return ListClientMetricsResourcesRequest.parse(buffer, apiVersion);
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsRequest.parse(buffer, apiVersion);
+ case SHARE_GROUP_HEARTBEAT:
+ return ShareGroupHeartbeatRequest.parse(buffer, apiVersion);
+ case SHARE_GROUP_DESCRIBE:
+ return ShareGroupDescribeRequest.parse(buffer, apiVersion);
+ case SHARE_FETCH:
+ return ShareFetchRequest.parse(buffer, apiVersion);
+ case SHARE_ACKNOWLEDGE:
+ return ShareAcknowledgeRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index dbafdbf3bcb07..5534168098e9d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -263,6 +263,14 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return ListClientMetricsResourcesResponse.parse(responseBuffer, version);
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsResponse.parse(responseBuffer, version);
+ case SHARE_GROUP_HEARTBEAT:
+ return ShareGroupHeartbeatResponse.parse(responseBuffer, version);
+ case SHARE_GROUP_DESCRIBE:
+ return ShareGroupDescribeResponse.parse(responseBuffer, version);
+ case SHARE_FETCH:
+ return ShareFetchResponse.parse(responseBuffer, version);
+ case SHARE_ACKNOWLEDGE:
+ return ShareAcknowledgeResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
new file mode 100644
index 0000000000000..1b77b43be33c1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeRequest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
+import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ShareAcknowledgeRequest extends AbstractRequest {
+
+ public static class Builder extends AbstractRequest.Builder {
+
+ private final ShareAcknowledgeRequestData data;
+
+ public Builder(ShareAcknowledgeRequestData data) {
+ this(data, false);
+ }
+
+ public Builder(ShareAcknowledgeRequestData data, boolean enableUnstableLastVersion) {
+ super(ApiKeys.SHARE_ACKNOWLEDGE, enableUnstableLastVersion);
+ this.data = data;
+ }
+
+ public static ShareAcknowledgeRequest.Builder forConsumer(String groupId, ShareFetchMetadata metadata,
+ Map> acknowledgementsMap) {
+ ShareAcknowledgeRequestData data = new ShareAcknowledgeRequestData();
+ data.setGroupId(groupId);
+ if (metadata != null) {
+ data.setMemberId(metadata.memberId().toString());
+ data.setShareSessionEpoch(metadata.epoch());
+ }
+
+ // Build a map of topics to acknowledge keyed by topic ID, and within each a map of partitions keyed by index
+ Map> ackMap = new HashMap<>();
+
+ for (Map.Entry> acknowledgeEntry : acknowledgementsMap.entrySet()) {
+ TopicIdPartition tip = acknowledgeEntry.getKey();
+ Map partMap = ackMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
+ ShareAcknowledgeRequestData.AcknowledgePartition ackPartition = partMap.get(tip.partition());
+ if (ackPartition == null) {
+ ackPartition = new ShareAcknowledgeRequestData.AcknowledgePartition()
+ .setPartitionIndex(tip.partition());
+ partMap.put(tip.partition(), ackPartition);
+ }
+ ackPartition.setAcknowledgementBatches(acknowledgeEntry.getValue());
+ }
+
+ // Finally, build up the data to fetch
+ data.setTopics(new ArrayList<>());
+ ackMap.forEach((topicId, partMap) -> {
+ ShareAcknowledgeRequestData.AcknowledgeTopic ackTopic = new ShareAcknowledgeRequestData.AcknowledgeTopic()
+ .setTopicId(topicId)
+ .setPartitions(new ArrayList<>());
+ data.topics().add(ackTopic);
+
+ partMap.forEach((index, ackPartition) -> ackTopic.partitions().add(ackPartition));
+ });
+
+ return new ShareAcknowledgeRequest.Builder(data, true);
+ }
+
+ public ShareAcknowledgeRequestData data() {
+ return data;
+ }
+
+ @Override
+ public ShareAcknowledgeRequest build(short version) {
+ return new ShareAcknowledgeRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final ShareAcknowledgeRequestData data;
+
+ public ShareAcknowledgeRequest(ShareAcknowledgeRequestData data, short version) {
+ super(ApiKeys.SHARE_ACKNOWLEDGE, version);
+ this.data = data;
+ }
+
+ @Override
+ public ShareAcknowledgeRequestData data() {
+ return data;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ Errors error = Errors.forException(e);
+ return new ShareAcknowledgeResponse(new ShareAcknowledgeResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(error.code()));
+ }
+
+ public static ShareAcknowledgeRequest parse(ByteBuffer buffer, short version) {
+ return new ShareAcknowledgeRequest(
+ new ShareAcknowledgeRequestData(new ByteBufferAccessor(buffer), version),
+ version
+ );
+ }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
new file mode 100644
index 0000000000000..5cab233dccac8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Possible error codes.
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
+ * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER}
+ * - {@link Errors#UNKNOWN_TOPIC_ID}
+ * - {@link Errors#INVALID_RECORD_STATE}
+ * - {@link Errors#KAFKA_STORAGE_ERROR}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#UNKNOWN_SERVER_ERROR}
+ */
+public class ShareAcknowledgeResponse extends AbstractResponse {
+
+ private final ShareAcknowledgeResponseData data;
+
+ public ShareAcknowledgeResponse(ShareAcknowledgeResponseData data) {
+ super(ApiKeys.SHARE_ACKNOWLEDGE);
+ this.data = data;
+ }
+
+ public Errors error() {
+ return Errors.forCode(data.errorCode());
+ }
+
+ @Override
+ public ShareAcknowledgeResponseData data() {
+ return data;
+ }
+
+ @Override
+ public Map errorCounts() {
+ HashMap counts = new HashMap<>();
+ updateErrorCounts(counts, Errors.forCode(data.errorCode()));
+ data.responses().forEach(
+ topic -> topic.partitions().forEach(
+ partition -> updateErrorCounts(counts, Errors.forCode(partition.errorCode()))
+ )
+ );
+ return counts;
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+ data.setThrottleTimeMs(throttleTimeMs);
+ }
+
+ public static ShareAcknowledgeResponse parse(ByteBuffer buffer, short version) {
+ return new ShareAcknowledgeResponse(
+ new ShareAcknowledgeResponseData(new ByteBufferAccessor(buffer), version)
+ );
+ }
+
+ private static boolean matchingTopic(ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse previousTopic, TopicIdPartition currentTopic) {
+ if (previousTopic == null)
+ return false;
+ return previousTopic.topicId().equals(currentTopic.topicId());
+ }
+
+ public static ShareAcknowledgeResponseData.PartitionData partitionResponse(TopicIdPartition topicIdPartition, Errors error) {
+ return partitionResponse(topicIdPartition.topicPartition().partition(), error);
+ }
+
+ public static ShareAcknowledgeResponseData.PartitionData partitionResponse(int partition, Errors error) {
+ return new ShareAcknowledgeResponseData.PartitionData()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code());
+ }
+
+ public static ShareAcknowledgeResponse of(Errors error,
+ int throttleTimeMs,
+ LinkedHashMap responseData,
+ List nodeEndpoints) {
+ return new ShareAcknowledgeResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints));
+ }
+
+ public static ShareAcknowledgeResponseData toMessage(Errors error, int throttleTimeMs,
+ Iterator> partIterator,
+ List nodeEndpoints) {
+ Map topicResponseList = new LinkedHashMap<>();
+ while (partIterator.hasNext()) {
+ Map.Entry entry = partIterator.next();
+ ShareAcknowledgeResponseData.PartitionData partitionData = entry.getValue();
+ // Since PartitionData alone doesn't know the partition ID, we set it here
+ partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
+ // Checking if the topic is already present in the map
+ if (topicResponseList.containsKey(entry.getKey().topicId())) {
+ topicResponseList.get(entry.getKey().topicId()).partitions().add(partitionData);
+ } else {
+ List partitionResponses = new ArrayList<>();
+ partitionResponses.add(partitionData);
+ topicResponseList.put(entry.getKey().topicId(), new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse()
+ .setTopicId(entry.getKey().topicId())
+ .setPartitions(partitionResponses));
+ }
+ }
+ ShareAcknowledgeResponseData data = new ShareAcknowledgeResponseData();
+ // KafkaApis should only pass in node endpoints on error, otherwise this should be an empty list
+ nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(
+ new ShareAcknowledgeResponseData.NodeEndpoint()
+ .setNodeId(endpoint.id())
+ .setHost(endpoint.host())
+ .setPort(endpoint.port())
+ .setRack(endpoint.rack())));
+ return data.setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(error.code())
+ .setResponses(new ArrayList<>(topicResponseList.values()));
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchMetadata.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchMetadata.java
new file mode 100644
index 0000000000000..4e5bcc2237e43
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchMetadata.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Uuid;
+
+public class ShareFetchMetadata {
+ /**
+ * The first epoch. When used in a ShareFetch request, indicates that the client
+ * wants to create a session.
+ */
+ public static final int INITIAL_EPOCH = 0;
+
+ /**
+ * An invalid epoch. When used in a ShareFetch request, indicates that the client
+ * wants to close an existing session.
+ */
+ public static final int FINAL_EPOCH = -1;
+
+ /**
+ *
+ */
+ public boolean isNewSession() {
+ return epoch == INITIAL_EPOCH;
+ }
+
+ /**
+ * Returns true if this is a full share fetch request.
+ */
+ public boolean isFull() {
+ return (this.epoch == INITIAL_EPOCH) || (this.epoch == FINAL_EPOCH);
+ }
+
+ /**
+ * Returns the next epoch.
+ *
+ * @param prevEpoch The previous epoch.
+ * @return The next epoch.
+ */
+ public static int nextEpoch(int prevEpoch) {
+ if (prevEpoch < 0) {
+ // The next epoch after FINAL_EPOCH is always FINAL_EPOCH itself.
+ return FINAL_EPOCH;
+ } else if (prevEpoch == Integer.MAX_VALUE) {
+ return 1;
+ } else {
+ return prevEpoch + 1;
+ }
+ }
+
+ /**
+ * The member ID.
+ */
+ private final Uuid memberId;
+
+ /**
+ * The share session epoch.
+ */
+ private final int epoch;
+
+ public ShareFetchMetadata(Uuid memberId, int epoch) {
+ this.memberId = memberId;
+ this.epoch = epoch;
+ }
+
+ public static ShareFetchMetadata initialEpoch(Uuid memberId) {
+ return new ShareFetchMetadata(memberId, INITIAL_EPOCH);
+ }
+
+ public ShareFetchMetadata nextEpoch() {
+ return new ShareFetchMetadata(memberId, nextEpoch(epoch));
+ }
+
+ public ShareFetchMetadata nextCloseExistingAttemptNew() {
+ return new ShareFetchMetadata(memberId, INITIAL_EPOCH);
+ }
+
+ public ShareFetchMetadata finalEpoch() {
+ return new ShareFetchMetadata(memberId, FINAL_EPOCH);
+ }
+
+ public Uuid memberId() {
+ return memberId;
+ }
+
+ public int epoch() {
+ return epoch;
+ }
+
+ public boolean isFinalEpoch() {
+ return epoch == FINAL_EPOCH;
+ }
+
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(memberId=").append(memberId).append(", ");
+ if (epoch == INITIAL_EPOCH) {
+ bld.append("epoch=INITIAL)");
+ } else if (epoch == FINAL_EPOCH) {
+ bld.append("epoch=FINAL)");
+ } else {
+ bld.append("epoch=").append(epoch).append(")");
+ }
+ return bld.toString();
+ }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
new file mode 100644
index 0000000000000..385e802a691a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareFetchRequestData;
+import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ShareFetchRequest extends AbstractRequest {
+
+ public static class Builder extends AbstractRequest.Builder {
+
+ private final ShareFetchRequestData data;
+
+ public Builder(ShareFetchRequestData data) {
+ this(data, false);
+ }
+
+ public Builder(ShareFetchRequestData data, boolean enableUnstableLastVersion) {
+ super(ApiKeys.SHARE_FETCH, enableUnstableLastVersion);
+ this.data = data;
+ }
+
+ public static Builder forConsumer(String groupId, ShareFetchMetadata metadata,
+ int maxWait, int minBytes, int maxBytes, int fetchSize,
+ List send, List forget,
+ Map> acknowledgementsMap) {
+ ShareFetchRequestData data = new ShareFetchRequestData();
+ data.setGroupId(groupId);
+ int ackOnlyPartitionMaxBytes = fetchSize;
+ boolean isClosingShareSession = false;
+ if (metadata != null) {
+ data.setMemberId(metadata.memberId().toString());
+ data.setShareSessionEpoch(metadata.epoch());
+ if (metadata.isFinalEpoch()) {
+ isClosingShareSession = true;
+ ackOnlyPartitionMaxBytes = 0;
+ }
+ }
+ data.setMaxWaitMs(maxWait);
+ data.setMinBytes(minBytes);
+ data.setMaxBytes(maxBytes);
+
+ // Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index
+ Map> fetchMap = new HashMap<>();
+
+ // First, start by adding the list of topic-partitions we are fetching
+ if (!isClosingShareSession) {
+ for (TopicIdPartition tip : send) {
+ Map partMap = fetchMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
+ ShareFetchRequestData.FetchPartition fetchPartition = new ShareFetchRequestData.FetchPartition()
+ .setPartitionIndex(tip.partition())
+ .setPartitionMaxBytes(fetchSize);
+ partMap.put(tip.partition(), fetchPartition);
+ }
+ }
+
+ // Next, add acknowledgements that we are piggybacking onto the fetch. Generally, the list of
+ // topic-partitions will be a subset, but if the assignment changes, there might be new entries to add
+ for (Map.Entry> acknowledgeEntry : acknowledgementsMap.entrySet()) {
+ TopicIdPartition tip = acknowledgeEntry.getKey();
+ Map partMap = fetchMap.computeIfAbsent(tip.topicId(), k -> new HashMap<>());
+ ShareFetchRequestData.FetchPartition fetchPartition = partMap.get(tip.partition());
+ if (fetchPartition == null) {
+ fetchPartition = new ShareFetchRequestData.FetchPartition()
+ .setPartitionIndex(tip.partition())
+ .setPartitionMaxBytes(ackOnlyPartitionMaxBytes);
+ partMap.put(tip.partition(), fetchPartition);
+ }
+ fetchPartition.setAcknowledgementBatches(acknowledgeEntry.getValue());
+ }
+
+ // Build up the data to fetch
+ if (!fetchMap.isEmpty()) {
+ data.setTopics(new ArrayList<>());
+ fetchMap.forEach((topicId, partMap) -> {
+ ShareFetchRequestData.FetchTopic fetchTopic = new ShareFetchRequestData.FetchTopic()
+ .setTopicId(topicId)
+ .setPartitions(new ArrayList<>());
+ partMap.forEach((index, fetchPartition) -> fetchTopic.partitions().add(fetchPartition));
+ data.topics().add(fetchTopic);
+ });
+ }
+
+ // And finally, forget the topic-partitions that are no longer in the session
+ if (!forget.isEmpty()) {
+ Map> forgetMap = new HashMap<>();
+ for (TopicIdPartition tip : forget) {
+ List partList = forgetMap.computeIfAbsent(tip.topicId(), k -> new ArrayList<>());
+ partList.add(tip.partition());
+ }
+ data.setForgottenTopicsData(new ArrayList<>());
+ forgetMap.forEach((topicId, partList) -> {
+ ShareFetchRequestData.ForgottenTopic forgetTopic = new ShareFetchRequestData.ForgottenTopic()
+ .setTopicId(topicId)
+ .setPartitions(new ArrayList<>());
+ partList.forEach(index -> forgetTopic.partitions().add(index));
+ data.forgottenTopicsData().add(forgetTopic);
+ });
+ }
+
+ return new Builder(data, true);
+ }
+
+ public ShareFetchRequestData data() {
+ return data;
+ }
+
+ @Override
+ public ShareFetchRequest build(short version) {
+ return new ShareFetchRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final ShareFetchRequestData data;
+ private volatile LinkedHashMap shareFetchData = null;
+ private volatile List toForget = null;
+
+ public ShareFetchRequest(ShareFetchRequestData data, short version) {
+ super(ApiKeys.SHARE_FETCH, version);
+ this.data = data;
+ }
+
+ @Override
+ public ShareFetchRequestData data() {
+ return data;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ Errors error = Errors.forException(e);
+ return new ShareFetchResponse(new ShareFetchResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(error.code()));
+ }
+
+ public static ShareFetchRequest parse(ByteBuffer buffer, short version) {
+ return new ShareFetchRequest(
+ new ShareFetchRequestData(new ByteBufferAccessor(buffer), version),
+ version
+ );
+ }
+
+ public static final class SharePartitionData {
+ public final Uuid topicId;
+ public final int maxBytes;
+
+ public SharePartitionData(
+ Uuid topicId,
+ int maxBytes
+ ) {
+ this.topicId = topicId;
+ this.maxBytes = maxBytes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ShareFetchRequest.SharePartitionData that = (ShareFetchRequest.SharePartitionData) o;
+ return Objects.equals(topicId, that.topicId) &&
+ maxBytes == that.maxBytes;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicId, maxBytes);
+ }
+
+ @Override
+ public String toString() {
+ return "SharePartitionData(" +
+ "topicId=" + topicId +
+ ", maxBytes=" + maxBytes +
+ ')';
+ }
+ }
+
+ public int minBytes() {
+ return data.minBytes();
+ }
+
+ public int maxBytes() {
+ return data.maxBytes();
+ }
+
+ public int maxWait() {
+ return data.maxWaitMs();
+ }
+
+ public Map shareFetchData(Map topicNames) {
+ if (shareFetchData == null) {
+ synchronized (this) {
+ if (shareFetchData == null) {
+ // Assigning the lazy-initialized `shareFetchData` in the last step
+ // to avoid other threads accessing a half-initialized object.
+ final LinkedHashMap shareFetchDataTmp = new LinkedHashMap<>();
+ data.topics().forEach(shareFetchTopic -> {
+ String name = topicNames.get(shareFetchTopic.topicId());
+ shareFetchTopic.partitions().forEach(shareFetchPartition -> {
+ // Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
+ shareFetchDataTmp.put(new TopicIdPartition(shareFetchTopic.topicId(), new TopicPartition(name, shareFetchPartition.partitionIndex())),
+ new ShareFetchRequest.SharePartitionData(
+ shareFetchTopic.topicId(),
+ shareFetchPartition.partitionMaxBytes()
+ )
+ );
+ });
+ });
+ shareFetchData = shareFetchDataTmp;
+ }
+ }
+ }
+ return shareFetchData;
+ }
+
+ public List forgottenTopics(Map topicNames) {
+ if (toForget == null) {
+ synchronized (this) {
+ if (toForget == null) {
+ // Assigning the lazy-initialized `toForget` in the last step
+ // to avoid other threads accessing a half-initialized object.
+ final List toForgetTmp = new ArrayList<>();
+ data.forgottenTopicsData().forEach(forgottenTopic -> {
+ String name = topicNames.get(forgottenTopic.topicId());
+ // Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
+ forgottenTopic.partitions().forEach(partitionId -> toForgetTmp.add(new TopicIdPartition(forgottenTopic.topicId(), new TopicPartition(name, partitionId))));
+ });
+ toForget = toForgetTmp;
+ }
+ }
+ }
+ return toForget;
+ }
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
new file mode 100644
index 0000000000000..b33969e0efa41
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ShareFetchResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+/**
+ * Possible error codes.
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
+ * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER}
+ * - {@link Errors#UNKNOWN_TOPIC_ID}
+ * - {@link Errors#INVALID_RECORD_STATE}
+ * - {@link Errors#KAFKA_STORAGE_ERROR}
+ * - {@link Errors#CORRUPT_MESSAGE}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#UNKNOWN_SERVER_ERROR}
+ */
+public class ShareFetchResponse extends AbstractResponse {
+
+ private final ShareFetchResponseData data;
+
+ private volatile LinkedHashMap responseData = null;
+
+ public ShareFetchResponse(ShareFetchResponseData data) {
+ super(ApiKeys.SHARE_FETCH);
+ this.data = data;
+ }
+
+ public Errors error() {
+ return Errors.forCode(data.errorCode());
+ }
+
+ @Override
+ public ShareFetchResponseData data() {
+ return data;
+ }
+
+ @Override
+ public Map errorCounts() {
+ HashMap counts = new HashMap<>();
+ updateErrorCounts(counts, Errors.forCode(data.errorCode()));
+ data.responses().forEach(
+ topic -> topic.partitions().forEach(
+ partition -> updateErrorCounts(counts, Errors.forCode(partition.errorCode()))
+ )
+ );
+ return counts;
+ }
+
+ public LinkedHashMap responseData(Map topicNames) {
+ if (responseData == null) {
+ synchronized (this) {
+ // Assigning the lazy-initialized `responseData` in the last step
+ // to avoid other threads accessing a half-initialized object.
+ if (responseData == null) {
+ final LinkedHashMap responseDataTmp = new LinkedHashMap<>();
+ data.responses().forEach(topicResponse -> {
+ String name = topicNames.get(topicResponse.topicId());
+ if (name != null) {
+ topicResponse.partitions().forEach(partitionData -> responseDataTmp.put(new TopicIdPartition(topicResponse.topicId(),
+ new TopicPartition(name, partitionData.partitionIndex())), partitionData));
+ }
+ });
+ responseData = responseDataTmp;
+ }
+ }
+ }
+ return responseData;
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+ data.setThrottleTimeMs(throttleTimeMs);
+ }
+
+ public static ShareFetchResponse parse(ByteBuffer buffer, short version) {
+ return new ShareFetchResponse(
+ new ShareFetchResponseData(new ByteBufferAccessor(buffer), version)
+ );
+ }
+
+ /**
+ * Returns `partition.records` as `Records` (instead of `BaseRecords`). If `records` is `null`, returns `MemoryRecords.EMPTY`.
+ *
+ *
If this response was deserialized after a share fetch, this method should never fail. An example where this would
+ * fail is a down-converted response (e.g. LazyDownConversionRecords) on the broker (before it's serialized and
+ * sent on the wire).
+ *
+ * @param partition partition data
+ * @return Records or empty record if the records in PartitionData is null.
+ */
+ public static Records recordsOrFail(ShareFetchResponseData.PartitionData partition) {
+ if (partition.records() == null) return MemoryRecords.EMPTY;
+ if (partition.records() instanceof Records) return (Records) partition.records();
+ throw new ClassCastException("The record type is " + partition.records().getClass().getSimpleName() + ", which is not a subtype of " +
+ Records.class.getSimpleName() + ". This method is only safe to call if the `ShareFetchResponse` was deserialized from bytes.");
+ }
+
+ /**
+ * Convenience method to find the size of a response.
+ *
+ * @param version The version of the request
+ * @param partIterator The partition iterator.
+ * @return The response size in bytes.
+ */
+ public static int sizeOf(short version,
+ Iterator> partIterator) {
+ // Since the throttleTimeMs and metadata field sizes are constant and fixed, we can
+ // use arbitrary values here without affecting the result.
+ ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, Collections.emptyList());
+ ObjectSerializationCache cache = new ObjectSerializationCache();
+ return 4 + data.size(cache, version);
+ }
+
+ /**
+ * @return The size in bytes of the records. 0 is returned if records of input partition is null.
+ */
+ public static int recordsSize(ShareFetchResponseData.PartitionData partition) {
+ return partition.records() == null ? 0 : partition.records().sizeInBytes();
+ }
+
+ public static ShareFetchResponse of(Errors error,
+ int throttleTimeMs,
+ LinkedHashMap responseData,
+ List nodeEndpoints) {
+ return new ShareFetchResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints));
+ }
+
+ public static ShareFetchResponseData toMessage(Errors error, int throttleTimeMs,
+ Iterator> partIterator,
+ List nodeEndpoints) {
+ Map topicResponseList = new LinkedHashMap<>();
+ while (partIterator.hasNext()) {
+ Map.Entry entry = partIterator.next();
+ ShareFetchResponseData.PartitionData partitionData = entry.getValue();
+ // Since PartitionData alone doesn't know the partition ID, we set it here
+ partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
+ // Checking if the topic is already present in the map
+ if (topicResponseList.containsKey(entry.getKey().topicId())) {
+ topicResponseList.get(entry.getKey().topicId()).partitions().add(partitionData);
+ } else {
+ List partitionResponses = new ArrayList<>();
+ partitionResponses.add(partitionData);
+ topicResponseList.put(entry.getKey().topicId(), new ShareFetchResponseData.ShareFetchableTopicResponse()
+ .setTopicId(entry.getKey().topicId())
+ .setPartitions(partitionResponses));
+ }
+ }
+ ShareFetchResponseData data = new ShareFetchResponseData();
+ // KafkaApis should only pass in node endpoints on error, otherwise this should be an empty list
+ nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(
+ new ShareFetchResponseData.NodeEndpoint()
+ .setNodeId(endpoint.id())
+ .setHost(endpoint.host())
+ .setPort(endpoint.port())
+ .setRack(endpoint.rack())));
+ return data.setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(error.code())
+ .setResponses(new ArrayList<>(topicResponseList.values()));
+ }
+
+ public static ShareFetchResponseData.PartitionData partitionResponse(TopicIdPartition topicIdPartition, Errors error) {
+ return partitionResponse(topicIdPartition.topicPartition().partition(), error);
+ }
+
+ public static ShareFetchResponseData.PartitionData partitionResponse(int partition, Errors error) {
+ return new ShareFetchResponseData.PartitionData()
+ .setPartitionIndex(partition)
+ .setErrorCode(error.code());
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
new file mode 100644
index 0000000000000..25c02e4a83c5e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeRequest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ShareGroupDescribeRequestData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ShareGroupDescribeRequest extends AbstractRequest {
+
+ public static class Builder extends AbstractRequest.Builder {
+
+ private final ShareGroupDescribeRequestData data;
+
+ public Builder(ShareGroupDescribeRequestData data) {
+ this(data, false);
+ }
+
+ public Builder(ShareGroupDescribeRequestData data, boolean enableUnstableLastVersion) {
+ super(ApiKeys.SHARE_GROUP_DESCRIBE, enableUnstableLastVersion);
+ this.data = data;
+ }
+
+ @Override
+ public ShareGroupDescribeRequest build(short version) {
+ return new ShareGroupDescribeRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final ShareGroupDescribeRequestData data;
+
+ public ShareGroupDescribeRequest(ShareGroupDescribeRequestData data, short version) {
+ super(ApiKeys.SHARE_GROUP_DESCRIBE, version);
+ this.data = data;
+ }
+
+ @Override
+ public ShareGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ ShareGroupDescribeResponseData data = new ShareGroupDescribeResponseData()
+ .setThrottleTimeMs(throttleTimeMs);
+ // Set error for each group
+ short errorCode = Errors.forException(e).code();
+ this.data.groupIds().forEach(
+ groupId -> data.groups().add(
+ new ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setErrorCode(errorCode)
+ )
+ );
+ return new ShareGroupDescribeResponse(data);
+ }
+
+ @Override
+ public ShareGroupDescribeRequestData data() {
+ return data;
+ }
+
+ public static ShareGroupDescribeRequest parse(ByteBuffer buffer, short version) {
+ return new ShareGroupDescribeRequest(
+ new ShareGroupDescribeRequestData(new ByteBufferAccessor(buffer), version),
+ version
+ );
+ }
+
+ public static List getErrorDescribedGroupList(
+ List groupIds,
+ Errors error
+ ) {
+ return groupIds.stream()
+ .map(groupId -> new ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ ).collect(Collectors.toList());
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
new file mode 100644
index 0000000000000..95dd371eedfa7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupDescribeResponse.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Possible error codes.
+ *
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#INVALID_GROUP_ID}
+ * - {@link Errors#GROUP_ID_NOT_FOUND}
+ */
+public class ShareGroupDescribeResponse extends AbstractResponse {
+
+ private final ShareGroupDescribeResponseData data;
+
+ public ShareGroupDescribeResponse(ShareGroupDescribeResponseData data) {
+ super(ApiKeys.SHARE_GROUP_DESCRIBE);
+ this.data = data;
+ }
+
+ @Override
+ public ShareGroupDescribeResponseData data() {
+ return data;
+ }
+
+ @Override
+ public Map errorCounts() {
+ HashMap counts = new HashMap<>();
+ data.groups().forEach(
+ group -> updateErrorCounts(counts, Errors.forCode(group.errorCode()))
+ );
+ return counts;
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+ data.setThrottleTimeMs(throttleTimeMs);
+ }
+
+ public static ShareGroupDescribeResponse parse(ByteBuffer buffer, short version) {
+ return new ShareGroupDescribeResponse(
+ new ShareGroupDescribeResponseData(new ByteBufferAccessor(buffer), version)
+ );
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatRequest.java
new file mode 100644
index 0000000000000..7e112ef29dd14
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatRequest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+public class ShareGroupHeartbeatRequest extends AbstractRequest {
+ /**
+ * A member epoch of -1 means that the member wants to leave the group.
+ */
+ public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
+
+ /**
+ * A member epoch of 0 means that the member wants to join the group.
+ */
+ public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
+
+ public static class Builder extends AbstractRequest.Builder {
+ private final ShareGroupHeartbeatRequestData data;
+
+ public Builder(ShareGroupHeartbeatRequestData data) {
+ this(data, true);
+ }
+
+ public Builder(ShareGroupHeartbeatRequestData data, boolean enableUnstableLastVersion) {
+ super(ApiKeys.SHARE_GROUP_HEARTBEAT, enableUnstableLastVersion);
+ this.data = data;
+ }
+
+ @Override
+ public ShareGroupHeartbeatRequest build(short version) {
+ return new ShareGroupHeartbeatRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final ShareGroupHeartbeatRequestData data;
+
+ public ShareGroupHeartbeatRequest(ShareGroupHeartbeatRequestData data, short version) {
+ super(ApiKeys.SHARE_GROUP_HEARTBEAT, version);
+ this.data = data;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new ShareGroupHeartbeatResponse(
+ new ShareGroupHeartbeatResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(Errors.forException(e).code())
+ );
+ }
+
+ @Override
+ public ShareGroupHeartbeatRequestData data() {
+ return data;
+ }
+
+ public static ShareGroupHeartbeatRequest parse(ByteBuffer buffer, short version) {
+ return new ShareGroupHeartbeatRequest(new ShareGroupHeartbeatRequestData(
+ new ByteBufferAccessor(buffer), version), version);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
new file mode 100644
index 0000000000000..de05d44aebecb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Possible error codes.
+ *
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#UNKNOWN_MEMBER_ID}
+ * - {@link Errors#GROUP_MAX_SIZE_REACHED}
+ */
+public class ShareGroupHeartbeatResponse extends AbstractResponse {
+ private final ShareGroupHeartbeatResponseData data;
+
+ public ShareGroupHeartbeatResponse(ShareGroupHeartbeatResponseData data) {
+ super(ApiKeys.SHARE_GROUP_HEARTBEAT);
+ this.data = data;
+ }
+
+ @Override
+ public ShareGroupHeartbeatResponseData data() {
+ return data;
+ }
+
+ @Override
+ public Map errorCounts() {
+ return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+ data.setThrottleTimeMs(throttleTimeMs);
+ }
+
+ public static ShareGroupHeartbeatResponse parse(ByteBuffer buffer, short version) {
+ return new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData(
+ new ByteBufferAccessor(buffer), version));
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslInternalConfigs.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslInternalConfigs.java
index c1793ebc3192b..d55ad04636f98 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslInternalConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslInternalConfigs.java
@@ -21,7 +21,7 @@
public class SaslInternalConfigs {
/**
* The server (broker) specifies a positive session length in milliseconds to a
- * SASL client when {@link BrokerSecurityConfigs#CONNECTIONS_MAX_REAUTH_MS} is
+ * SASL client when {@link BrokerSecurityConfigs#CONNECTIONS_MAX_REAUTH_MS_CONFIG} is
* positive as per KIP
* 368: Allow SASL Connections to Periodically Re-Authenticate. The session
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 06e8dcd0bca5b..ee0ed8007d83c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -182,7 +182,7 @@ public SaslServerAuthenticator(Map configs,
throw new IllegalArgumentException("Callback handler not specified for SASL mechanism " + mechanism);
if (!subjects.containsKey(mechanism))
throw new IllegalArgumentException("Subject cannot be null for SASL mechanism " + mechanism);
- LOG.trace("{} for mechanism={}: {}", BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, mechanism,
+ LOG.trace("{} for mechanism={}: {}", BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG, mechanism,
connectionsMaxReauthMsByMechanism.get(mechanism));
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
index 2d296b545bbca..d39a06787fb54 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
@@ -55,7 +55,7 @@
*
*
* This {@link AuthenticateCallbackHandler} is enabled in the broker configuration by setting the
- * {@link org.apache.kafka.common.config.internals.BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS}
+ * {@link org.apache.kafka.common.config.internals.BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG}
* like so:
*
*
@@ -86,7 +86,7 @@
* validation callback handler:
*
*