Skip to content

Commit

Permalink
Update to Kafka 3.8.0 (#1313)
Browse files Browse the repository at this point in the history
📦 Updates [org.apache.kafka:kafka-clients](https://kafka.apache.org/)
from 3.7.1 to 3.8.0

📦 Updates
[io.github.embeddedkafka:embedded-kafka](https://github.com/embeddedkafka/embedded-kafka)
from `3.7.1.1` to `3.8.0`

📜 [GitHub Release
Notes](https://github.com/embeddedkafka/embedded-kafka/releases/tag/v3.8.0)
- [Version
Diff](embeddedkafka/embedded-kafka@v3.7.1.1...v3.8.0)

---------

Co-authored-by: zio-scala-steward[bot] <145262613+zio-scala-steward[bot]@users.noreply.github.com>
  • Loading branch information
erikvanoosten and zio-scala-steward[bot] authored Aug 25, 2024
1 parent 35530e8 commit 4983ef9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 20 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import MimaSettings.mimaSettings
*/
lazy val binCompatVersionToCompare = None // Some("2.8.0")

lazy val kafkaVersion = "3.7.1"
lazy val embeddedKafkaVersion = "3.7.1.1" // Should be the same as kafkaVersion, except for the patch part
lazy val kafkaVersion = "3.8.0"
lazy val embeddedKafkaVersion = "3.8.0" // Should be the same as kafkaVersion, except for the patch part

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.7"
Expand Down
35 changes: 17 additions & 18 deletions zio-kafka-testkit/src/main/scala/zio/kafka/testkit/Kafka.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package zio.kafka.testkit

import _root_.kafka.server.KafkaConfig
import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig }
import zio._

Expand Down Expand Up @@ -78,23 +77,23 @@ object Kafka {
embeddedWithBrokerProps(
ports =>
Map(
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"super.users" -> "User:ANONYMOUS",
"ssl.client.auth" -> "required",
"ssl.enabled.protocols" -> "TLSv1.2",
"ssl.truststore.type" -> "JKS",
"ssl.keystore.type" -> "JKS",
"ssl.truststore.location" -> KafkaTestUtils.trustStoreFile.getAbsolutePath,
"ssl.truststore.password" -> "123456",
"ssl.keystore.location" -> KafkaTestUtils.keyStoreFile.getAbsolutePath,
"ssl.keystore.password" -> "123456",
"ssl.key.password" -> "123456",
KafkaConfig.InterBrokerListenerNameProp -> "SSL",
KafkaConfig.ListenersProp -> s"SSL://localhost:${ports.kafkaPort}",
KafkaConfig.AdvertisedListenersProp -> s"SSL://localhost:${ports.kafkaPort}",
KafkaConfig.ZkConnectionTimeoutMsProp -> s"${30.second.toMillis}"
"group.min.session.timeout.ms" -> "500",
"group.initial.rebalance.delay.ms" -> "0",
"authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer",
"super.users" -> "User:ANONYMOUS",
"ssl.client.auth" -> "required",
"ssl.enabled.protocols" -> "TLSv1.2",
"ssl.truststore.type" -> "JKS",
"ssl.keystore.type" -> "JKS",
"ssl.truststore.location" -> KafkaTestUtils.trustStoreFile.getAbsolutePath,
"ssl.truststore.password" -> "123456",
"ssl.keystore.location" -> KafkaTestUtils.keyStoreFile.getAbsolutePath,
"ssl.keystore.password" -> "123456",
"ssl.key.password" -> "123456",
"inter.broker.listener.name" -> "SSL",
"listeners" -> s"SSL://localhost:${ports.kafkaPort}",
"advertised.listeners" -> s"SSL://localhost:${ports.kafkaPort}",
"zookeeper.connection.timeout.ms" -> s"${30.second.toMillis}"
),
customBrokerProps
)
Expand Down
10 changes: 10 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,14 @@ object AdminClient {
override def asJava: JConsumerGroupState = JConsumerGroupState.EMPTY
}

case object Assigning extends ConsumerGroupState {
override def asJava: JConsumerGroupState = JConsumerGroupState.ASSIGNING
}

case object Reconciling extends ConsumerGroupState {
override def asJava: JConsumerGroupState = JConsumerGroupState.RECONCILING
}

def apply(state: JConsumerGroupState): ConsumerGroupState =
state match {
case JConsumerGroupState.UNKNOWN => ConsumerGroupState.Unknown
Expand All @@ -1054,6 +1062,8 @@ object AdminClient {
case JConsumerGroupState.STABLE => ConsumerGroupState.Stable
case JConsumerGroupState.DEAD => ConsumerGroupState.Dead
case JConsumerGroupState.EMPTY => ConsumerGroupState.Empty
case JConsumerGroupState.ASSIGNING => ConsumerGroupState.Assigning
case JConsumerGroupState.RECONCILING => ConsumerGroupState.Reconciling
}
}

Expand Down

0 comments on commit 4983ef9

Please sign in to comment.