diff --git a/testkit/src/main/scala/akka/kafka/testkit/internal/TestcontainersKafka.scala b/testkit/src/main/scala/akka/kafka/testkit/internal/TestcontainersKafka.scala index 49d7f5ba4..daba7d3ce 100644 --- a/testkit/src/main/scala/akka/kafka/testkit/internal/TestcontainersKafka.scala +++ b/testkit/src/main/scala/akka/kafka/testkit/internal/TestcontainersKafka.scala @@ -21,7 +21,7 @@ object TestcontainersKafka { private var kafkaPortInternal: Int = -1 private def requireStarted(): Unit = - require(kafkaPortInternal != -1, "Testcontainers Kafka hasn't been started via `setUp`") + require(cluster != null || kafkaPortInternal != -1, "Testcontainers Kafka hasn't been started via `setUp`") /** * Override this to change default settings for starting the Kafka testcontainers cluster. @@ -39,18 +39,29 @@ object TestcontainersKafka { kafkaBootstrapServersInternal } - def brokerContainers: Vector[AlpakkaKafkaContainer] = cluster.getBrokers.asScala.toVector + def brokerContainers: Vector[AlpakkaKafkaContainer] = { + requireStarted() + cluster.getBrokers.asScala.toVector + } - def zookeeperContainer: GenericContainer[_] = cluster.getZooKeeper + def zookeeperContainer: GenericContainer[_] = { + requireStarted() + cluster.getZooKeeper + } - def schemaRegistryContainer: Option[SchemaRegistryContainer] = cluster.getSchemaRegistry.asScala + def schemaRegistryContainer: Option[SchemaRegistryContainer] = { + requireStarted() + cluster.getSchemaRegistry.asScala + } - def getSchemaRegistryUrl: String = + def getSchemaRegistryUrl: String = { + requireStarted() cluster.getSchemaRegistry.asScala .map(_.getSchemaRegistryUrl) .getOrElse( throw new RuntimeException("Did you enable schema registry in your KafkaTestkitTestcontainersSettings?") ) + } def startCluster(): String = startCluster(testcontainersSettings)