Skip to content

Commit

Permalink
testkit: fail better if cluster hasn't started (#1492)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuvalshi0 authored May 1, 2022
1 parent 460bf87 commit d4949e3
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object TestcontainersKafka {
private var kafkaPortInternal: Int = -1

private def requireStarted(): Unit =
require(kafkaPortInternal != -1, "Testcontainers Kafka hasn't been started via `setUp`")
require(cluster != null || kafkaPortInternal != -1, "Testcontainers Kafka hasn't been started via `setUp`")

/**
* Override this to change default settings for starting the Kafka testcontainers cluster.
Expand All @@ -39,18 +39,29 @@ object TestcontainersKafka {
kafkaBootstrapServersInternal
}

def brokerContainers: Vector[AlpakkaKafkaContainer] = cluster.getBrokers.asScala.toVector
def brokerContainers: Vector[AlpakkaKafkaContainer] = {
requireStarted()
cluster.getBrokers.asScala.toVector
}

def zookeeperContainer: GenericContainer[_] = cluster.getZooKeeper
def zookeeperContainer: GenericContainer[_] = {
requireStarted()
cluster.getZooKeeper
}

def schemaRegistryContainer: Option[SchemaRegistryContainer] = cluster.getSchemaRegistry.asScala
def schemaRegistryContainer: Option[SchemaRegistryContainer] = {
requireStarted()
cluster.getSchemaRegistry.asScala
}

def getSchemaRegistryUrl: String =
def getSchemaRegistryUrl: String = {
requireStarted()
cluster.getSchemaRegistry.asScala
.map(_.getSchemaRegistryUrl)
.getOrElse(
throw new RuntimeException("Did you enable schema registry in your KafkaTestkitTestcontainersSettings?")
)
}

def startCluster(): String = startCluster(testcontainersSettings)

Expand Down

0 comments on commit d4949e3

Please sign in to comment.